This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 7268e638dd1f68ccdc21fc2f940991f4e407cd21
Author: Mark Payne <[email protected]>
AuthorDate: Thu May 19 11:58:52 2022 -0400

    NIFI-10037: When system test fails to clean up flow, destroy the entire 
environment so that the next test starts in a healthy state. Name 
troubleshooting directories with the name of the test class to avoid ambiguity. 
Also added a log statement so that we know which test is running when looking 
at the log output from the tests themselves. Finally, found an issue in 
AbstractComponentNode in which we iterate over the elements in a Map and call 
setProperty, which can update the underlyin [...]
    
    This closes #6059
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../nifi/controller/AbstractComponentNode.java     |  6 ++-
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |  8 ++++
 .../tests/system/TroubleshootingTestWatcher.java   | 11 +++--
 .../tests/system/loadbalance/LoadBalanceIT.java    | 54 ++++++++++++++++------
 .../resources/conf/clustered/node1/nifi.properties |  2 +-
 .../resources/conf/clustered/node2/nifi.properties |  2 +-
 .../test/resources/conf/default/nifi.properties    |  2 +-
 7 files changed, 63 insertions(+), 22 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index edf26e46da..5f5e78c3cd 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -66,6 +66,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -600,7 +601,10 @@ public abstract class AbstractComponentNode implements 
ComponentNode {
         // use setProperty instead of setProperties so we can bypass the class 
loading logic.
         // Consider value changed if it is different than the 
PropertyDescriptor's default value because we need to call the 
#onPropertiesModified
         // method on the component if the current value is not the default 
value, since the component itself is being reloaded.
-        for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry 
: this.properties.entrySet()) {
+        // Also, create a copy of this.properties instead of iterating 
directly over this.properties since the call to setProperty can change the
+        // underlying map, and the behavior of modifying the map while 
iterating over its elements is undefined.
+        final Map<PropertyDescriptor, PropertyConfiguration> 
copyOfPropertiesMap = new HashMap<>(this.properties);
+        for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry 
: copyOfPropertiesMap.entrySet()) {
             final PropertyDescriptor propertyDescriptor = entry.getKey();
             final PropertyConfiguration configuration = entry.getValue();
 
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 341e384812..298e3ec473 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -73,6 +73,8 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
     @BeforeEach
     public void setup(final TestInfo testInfo) throws IOException {
         this.testInfo = testInfo;
+        final String testClassName = 
testInfo.getTestClass().map(Class::getSimpleName).orElse("<Unknown Test 
Class>");
+        logger.info("Beginning Test {}:{}", testClassName, 
testInfo.getDisplayName());
 
         
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
         setupClient();
@@ -116,6 +118,12 @@ public abstract class NiFiSystemIT implements 
NiFiInstanceProvider {
 
             if (isDestroyEnvironmentAfterEachTest()) {
                 cleanup();
+            } else if (destroyFlowFailure != null) {
+                // If unable to destroy the flow, we need to shutdown the 
instance and delete the flow and completely recreate the environment.
+                // Otherwise, we will be left in an unknown state for the next 
test, and that can cause cascading failures that are very difficult
+                // to understand and troubleshoot.
+                logger.info("Because there was a failure when destroying the 
flow, will completely tear down the environments and start with a clean 
environment for the next test.");
+                cleanup();
             }
 
             if (destroyFlowFailure != null) {
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java
index 8d71362359..b2c1ab078f 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java
@@ -40,7 +40,8 @@ public class TroubleshootingTestWatcher implements 
TestWatcher {
                 final NiFiInstanceProvider provider = (NiFiInstanceProvider) 
testInstance;
                 final String displayName = context.getDisplayName();
                 try {
-                    final File dir = quarantineTroubleshootingInfo(provider, 
displayName, cause);
+                    final String testClassName = 
context.getTestClass().map(Class::getSimpleName).orElse("TestClassUnknown");
+                    final File dir = quarantineTroubleshootingInfo(provider, 
testClassName, displayName, cause);
                     logger.info("Test Failed [{}]: Troubleshooting information 
stored [{}]", displayName, dir.getAbsolutePath());
                 } catch (final Exception e) {
                     logger.error("Test Failed [{}]: Troubleshooting 
information not stored", displayName, e);
@@ -49,17 +50,21 @@ public class TroubleshootingTestWatcher implements 
TestWatcher {
         }
     }
 
-    private File quarantineTroubleshootingInfo(final NiFiInstanceProvider 
provider, final String methodName, final Throwable failureCause) throws 
IOException {
+    private File quarantineTroubleshootingInfo(final NiFiInstanceProvider 
provider, final String testClassName, final String methodName, final Throwable 
failureCause) throws IOException {
         NiFiInstance instance = provider.getNiFiInstance();
 
         // The teardown method may or may not have already run at this point. 
If it has, the instance will be null.
         // In that case, just create a new instance and use it - it will map 
to the same directories.
         if (instance == null) {
+            logger.warn("While capturing troubleshooting info for {}, the NiFi 
Instance is not available. Will create a new one for Diagnostics purposes, but 
some of the diagnostics may be less " +
+                "accurate, since it's not the same instance that ran the 
test", methodName);
+
             instance = provider.getInstanceFactory().createInstance();
         }
 
         final File troubleshooting = new File("target/troubleshooting");
-        final File quarantineDir = new File(troubleshooting, methodName);
+        final String quarantineDirName = testClassName + "-" + 
methodName.replace("()", "");
+        final File quarantineDir = new File(troubleshooting, 
quarantineDirName);
         quarantineDir.mkdirs();
 
         instance.quarantineTroubleshootingInfo(quarantineDir, failureCause);
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
index 2e1b1c0698..5e27f24737 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
@@ -42,12 +42,13 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.LongSummaryStatistics;
 import java.util.Map;
 import java.util.Set;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class LoadBalanceIT extends NiFiSystemIT {
     private final Logger logger = LoggerFactory.getLogger(getClass());
@@ -277,13 +278,13 @@ public class LoadBalanceIT extends NiFiSystemIT {
     private int getQueueSize(final String connectionId) {
         final ConnectionStatusEntity statusEntity = 
getConnectionStatus(connectionId);
         final ConnectionStatusDTO connectionStatusDto = 
statusEntity.getConnectionStatus();
-        return 
connectionStatusDto.getAggregateSnapshot().getFlowFilesQueued().intValue();
+        return connectionStatusDto.getAggregateSnapshot().getFlowFilesQueued();
     }
 
     private long getQueueBytes(final String connectionId) {
         final ConnectionStatusEntity statusEntity = 
getConnectionStatus(connectionId);
         final ConnectionStatusDTO connectionStatusDto = 
statusEntity.getConnectionStatus();
-        return 
connectionStatusDto.getAggregateSnapshot().getBytesQueued().longValue();
+        return connectionStatusDto.getAggregateSnapshot().getBytesQueued();
     }
 
     private boolean isConnectionDoneLoadBalancing(final String connectionId) {
@@ -372,22 +373,45 @@ public class LoadBalanceIT extends NiFiSystemIT {
         instance2.start(true);
         waitForAllNodesConnected();
 
-        // Generate the data again
         generate = 
getNifiClient().getProcessorClient().getProcessor(generate.getId());
-        getNifiClient().getProcessorClient().startProcessor(generate);
 
-        // Wait until all 20 FlowFiles are queued up
-        waitFor(() -> {
-            final ConnectionStatusEntity secondRoundStatusEntity = 
getConnectionStatus(connection.getId());
-            return 
secondRoundStatusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued()
 == 20;
-        });
+        // Generate data and wait for it to be spread across the cluster. We 
do this in an infinite while() loop because
+        // there can be a failure, in which case we'll retry. If that happens, 
we just want to keep retrying until the test
+        // times out.
+        while (true) {
+            // Generate the data.
+            getNifiClient().getProcessorClient().startProcessor(generate);
 
-        // Wait until load balancing is complete
-        waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
+            // Wait until all 20 FlowFiles are queued up
+            waitFor(() -> {
+                final ConnectionStatusEntity secondRoundStatusEntity = 
getConnectionStatus(connection.getId());
+                return 
secondRoundStatusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued()
 == 20;
+            });
 
-        // Ensure that the FlowFiles are evenly distributed between the nodes.
-        final ConnectionStatusEntity afterSecondDataGenerationStatusEntity = 
getConnectionStatus(connection.getId());
-        assertTrue(isEvenlyDistributed(afterSecondDataGenerationStatusEntity));
+            // Wait until load balancing is complete
+            waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
+
+            // Log the distribution of data between nodes for easier 
troubleshooting in case there's a failure.
+            final ConnectionStatusEntity afterSecondDataGenerationStatusEntity 
= getConnectionStatus(connection.getId());
+            final List<NodeConnectionStatusSnapshotDTO> nodeSnapshots = 
afterSecondDataGenerationStatusEntity.getConnectionStatus().getNodeSnapshots();
+            logger.info("FlowFiles Queued Per Node:");
+            nodeSnapshots.forEach(snapshot ->
+                logger.info("{}:{} - {}", snapshot.getAddress(), 
snapshot.getApiPort(), snapshot.getStatusSnapshot().getFlowFilesQueued())
+            );
+
+            // Check if the FlowFiles are evenly distributed between the 
nodes. If so, we're done.
+            final boolean evenlyDistributed = 
isEvenlyDistributed(afterSecondDataGenerationStatusEntity);
+            if (evenlyDistributed) {
+                break;
+            }
+
+            // If there's an IOException thrown while communicating between 
the nodes, the data will be rebalanced and will go to
+            // the local partition. There's nothing we can do about that in 
this test. However, we can verify that NiFi recovers
+            // from this and continues to distribute data. To do that, we will 
stop the processor so that it can be started again
+            // (and produce more data) and we can empty the queue so that we 
know how much data to expect.
+            getNifiClient().getProcessorClient().stopProcessor(generate);
+            getClientUtil().emptyQueue(connection.getId());
+        }
 
         assertEquals(20, getQueueSize(connection.getId()));
         assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId()));
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
index 4544f2fc3b..049ac178ef 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
@@ -77,7 +77,7 @@ 
nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
 nifi.content.claim.max.appendable.size=50 KB
 nifi.content.repository.directory.default=./content_repository
 nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
+nifi.content.repository.archive.max.usage.percentage=90%
 nifi.content.repository.archive.enabled=true
 nifi.content.repository.always.sync=false
 nifi.content.viewer.url=../nifi-content-viewer/
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
index acd5c6707c..4b7f644058 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
@@ -77,7 +77,7 @@ 
nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
 nifi.content.claim.max.appendable.size=50 KB
 nifi.content.repository.directory.default=./content_repository
 nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
+nifi.content.repository.archive.max.usage.percentage=90%
 nifi.content.repository.archive.enabled=true
 nifi.content.repository.always.sync=false
 nifi.content.viewer.url=../nifi-content-viewer/
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
index 7b3de1452f..7c6426c67b 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
@@ -77,7 +77,7 @@ 
nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
 nifi.content.claim.max.appendable.size=50 KB
 nifi.content.repository.directory.default=./content_repository
 nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
+nifi.content.repository.archive.max.usage.percentage=90%
 nifi.content.repository.archive.enabled=true
 nifi.content.repository.always.sync=false
 nifi.content.viewer.url=../nifi-content-viewer/

Reply via email to