This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 21503f6353 NIFI-10362: When asynchronous node disconnect is issued, do
not send disconnect to node if the node becomes reconnected in the interim.
Also, addressed the issue in which a disconnected node acts on a replicated
request during the first phase by detect that it's the first phase if
configured for cluster, not when only when connected to a cluster.
21503f6353 is described below
commit 21503f6353c33063b7acff5915a94397aad72926
Author: Mark Payne <[email protected]>
AuthorDate: Wed Aug 17 09:38:34 2022 -0400
NIFI-10362: When asynchronous node disconnect is issued, do not send
disconnect to node if the node becomes reconnected in the interim. Also,
addressed the issue in which a disconnected node acts on a replicated request
during the first phase by detect that it's the first phase if configured for
cluster, not when only when connected to a cluster.
This closes #6308
Signed-off-by: David Handermann <[email protected]>
---
.../cluster/coordination/node/NodeClusterCoordinator.java | 13 +++++++++++++
.../java/org/apache/nifi/web/api/ApplicationResource.java | 2 +-
.../nifi/tests/system/clustering/FlowSynchronizationIT.java | 2 ++
3 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 19678ed1db..ccee9bb74c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -989,6 +989,19 @@ public class NodeClusterCoordinator implements
ClusterCoordinator, ProtocolHandl
Exception lastException = null;
for (int i = 0; i < attempts; i++) {
+ // If the node is restarted, it will attempt to reconnect.
In that case, we don't want to disconnect the node
+ // again. So we instead log the fact that the state has
now transitioned to this point and consider the task completed.
+ final NodeConnectionState currentConnectionState =
getConnectionState(nodeId);
+ if (currentConnectionState ==
NodeConnectionState.CONNECTING || currentConnectionState ==
NodeConnectionState.CONNECTED) {
+ reportEvent(nodeId, Severity.INFO, String.format(
+ "State of Node %s has now transitioned from
DISCONNECTED to %s so will no longer attempt to notify node that it is
disconnected.", nodeId, currentConnectionState));
+ future.completeExceptionally(new
IllegalStateException("Node was marked as disconnected but its state
transitioned from DISCONNECTED back to " + currentConnectionState +
+ " before the node could be notified. This
typically indicates that the node was restarted."));
+
+ return;
+ }
+
+ // Try to send disconnect notice to the node
try {
senderListener.disconnect(request);
reportEvent(nodeId, Severity.INFO, "Node disconnected
due to " + request.getExplanation());
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
index ffde6d9683..8006caa6ac 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ApplicationResource.java
@@ -371,7 +371,7 @@ public abstract class ApplicationResource {
*/
protected boolean isTwoPhaseRequest(final HttpServletRequest
httpServletRequest) {
final String transactionId =
httpServletRequest.getHeader(RequestReplicator.REQUEST_TRANSACTION_ID_HEADER);
- return transactionId != null && isConnectedToCluster();
+ return transactionId != null && isClustered();
}
/**
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index 1ead90a772..a6da0107ed 100644
---
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -245,6 +245,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
getClientUtil().enableControllerService(countService);
getClientUtil().enableControllerService(sleepService);
getClientUtil().startReportingTask(reportingTask);
+ getClientUtil().waitForValidProcessor(count.getId()); // Now that
service was enabled, wait for processor to become valid.
getClientUtil().startProcessGroupComponents(group.getId());
getClientUtil().startProcessor(terminate);
getClientUtil().startProcessor(generate);
@@ -603,6 +604,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
getClientUtil().enableControllerService(countService);
getClientUtil().enableControllerService(sleepService);
getClientUtil().startReportingTask(reportingTask);
+ getClientUtil().waitForValidProcessor(count.getId()); // Now that
service was enabled, wait for processor to become valid.
getClientUtil().startProcessGroupComponents(group.getId());
getClientUtil().startProcessor(terminate);
getClientUtil().startProcessor(generate);