This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 21503f6353 NIFI-10362: When asynchronous node disconnect is issued, do 
not send disconnect to node if the node becomes reconnected in the interim. 
Also, addressed the issue in which a disconnected node acts on a replicated 
request during the first phase by detect that it's the first phase if 
configured for cluster, not when only when connected to a cluster.
21503f6353 is described below

commit 21503f6353c33063b7acff5915a94397aad72926
Author: Mark Payne <[email protected]>
AuthorDate: Wed Aug 17 09:38:34 2022 -0400

    NIFI-10362: When asynchronous node disconnect is issued, do not send 
disconnect to node if the node becomes reconnected in the interim. Also, 
addressed the issue in which a disconnected node acts on a replicated request 
during the first phase by detect that it's the first phase if configured for 
cluster, not when only when connected to a cluster.
    
    This closes #6308
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../cluster/coordination/node/NodeClusterCoordinator.java   | 13 +++++++++++++
 .../java/org/apache/nifi/web/api/ApplicationResource.java   |  2 +-
 .../nifi/tests/system/clustering/FlowSynchronizationIT.java |  2 ++
 3 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 19678ed1db..ccee9bb74c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -989,6 +989,19 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
                 Exception lastException = null;
                 for (int i = 0; i < attempts; i++) {
+                    // If the node is restarted, it will attempt to reconnect. 
In that case, we don't want to disconnect the node
+                    // again. So we instead log the fact that the state has 
now transitioned to this point and consider the task completed.
+                    final NodeConnectionState currentConnectionState = 
getConnectionState(nodeId);
+                    if (currentConnectionState == 
NodeConnectionState.CONNECTING || currentConnectionState == 
NodeConnectionState.CONNECTED) {
+                        reportEvent(nodeId, Severity.INFO, String.format(
+                            "State of Node %s has now transitioned from 
DISCONNECTED to %s so will no longer attempt to notify node that it is 
disconnected.", nodeId, currentConnectionState));
+                        future.completeExceptionally(new 
IllegalStateException("Node was marked as disconnected but its state 
transitioned from DISCONNECTED back to " + currentConnectionState +
+                            " before the node could be notified. This 
typically indicates that the node was restarted."));
+
+                        return;
+                    }
+
+                    // Try to send disconnect notice to the node
                     try {
                         senderListener.disconnect(request);
                         reportEvent(nodeId, Severity.INFO, "Node disconnected 
due to " + request.getExplanation());
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index ffde6d9683..8006caa6ac 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -371,7 +371,7 @@ public abstract class ApplicationResource {
      */
     protected boolean isTwoPhaseRequest(final HttpServletRequest 
httpServletRequest) {
         final String transactionId = 
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
-        return transactionId != null && isConnectedToCluster();
+        return transactionId != null && isClustered();
     }
 
     /**
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index 1ead90a772..a6da0107ed 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -245,6 +245,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         getClientUtil().enableControllerService(countService);
         getClientUtil().enableControllerService(sleepService);
         getClientUtil().startReportingTask(reportingTask);
+        getClientUtil().waitForValidProcessor(count.getId()); // Now that 
service was enabled, wait for processor to become valid.
         getClientUtil().startProcessGroupComponents(group.getId());
         getClientUtil().startProcessor(terminate);
         getClientUtil().startProcessor(generate);
@@ -603,6 +604,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         getClientUtil().enableControllerService(countService);
         getClientUtil().enableControllerService(sleepService);
         getClientUtil().startReportingTask(reportingTask);
+        getClientUtil().waitForValidProcessor(count.getId()); // Now that 
service was enabled, wait for processor to become valid.
         getClientUtil().startProcessGroupComponents(group.getId());
         getClientUtil().startProcessor(terminate);
         getClientUtil().startProcessor(generate);

Reply via email to