[ 
https://issues.apache.org/jira/browse/NIFI-6781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marc Parisi updated NIFI-6781:
------------------------------
    Description: 
I've noticed that replication is attempted locally. I tested a simple change to 
eliminate the local node; however, I suspect this is not a big deal or I've 
missed something or the cluster states does not include the local identifier. 
All tests allow for local instances with different ports, implying that pruning 
is potentially unnecessary or incorrect logic. Therefore I've created this as a 
an "Improvement" as I dive further into the code to validate my change. If 
anyone has the immediate answer regarding this code I'm happy to close this as 
OBE.

 

This is the local, partial change – currently in the process of validating that 
this actually solves the issue; however, the overarching question of "Is this a 
bad idea?" is something I'd love an answer on as I dive into this code. 
{code:java}
 --- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -217,11 +217,17 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             }
         }
 
-        final List<NodeIdentifier> nodeIds = 
stateMap.get(NodeConnectionState.CONNECTED);
+        // get nodes that do not match this node.
+        final List<NodeIdentifier> nodeIds = 
stateMap.get(NodeConnectionState.CONNECTED).stream().filter(x -> {
+            return clusterCoordinator.getLocalNodeIdentifier() == null || x != 
clusterCoordinator.getLocalNodeIdentifier();
+        }).collect(Collectors.toList());
+
         if (nodeIds == null || nodeIds.isEmpty()) {
             throw new NoConnectedNodesException();
         }
 
+        logger.debug("Attempting to replicate to {} nodes", nodeIds.size());
{code}

  was:
I've noticed that replication is attempted locally. I tested a simple change to 
eliminate the local node; however, I suspect this is not a big deal or I've 
missed something or the cluster states does not include the local identifier. 
All tests allow for local instances with different ports, implying that pruning 
is potentially unnecessary or incorrect logic. Therefore I've created this as a 
an "Improvement" as I dive further into the code to validate my change. If 
anyone has the immediate answer regarding this code I'm happy to close this as 
OBE.

 

This is the local, partial change. 
{code:java}
 --- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -217,11 +217,17 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
             }
         }
 
-        final List<NodeIdentifier> nodeIds = 
stateMap.get(NodeConnectionState.CONNECTED);
+        // get nodes that do not match this node.
+        final List<NodeIdentifier> nodeIds = 
stateMap.get(NodeConnectionState.CONNECTED).stream().filter(x -> {
+            return clusterCoordinator.getLocalNodeIdentifier() == null || x != 
clusterCoordinator.getLocalNodeIdentifier();
+        }).collect(Collectors.toList());
+
         if (nodeIds == null || nodeIds.isEmpty()) {
             throw new NoConnectedNodesException();
         }
 
+        logger.debug("Attempting to replicate to {} nodes", nodeIds.size());
{code}


> ThreadPoolRequestReplicator appears to replicate to itself
> ----------------------------------------------------------
>
>                 Key: NIFI-6781
>                 URL: https://issues.apache.org/jira/browse/NIFI-6781
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Marc Parisi
>            Assignee: Marc Parisi
>            Priority: Minor
>
> I've noticed that replication is attempted locally. I tested a simple change 
> to eliminate the local node; however, I suspect this is not a big deal or 
> I've missed something or the cluster states does not include the local 
> identifier. All tests allow for local instances with different ports, 
> implying that pruning is potentially unnecessary or incorrect logic. 
> Therefore I've created this as a an "Improvement" as I dive further into the 
> code to validate my change. If anyone has the immediate answer regarding this 
> code I'm happy to close this as OBE.
>  
> This is the local, partial change – currently in the process of validating 
> that this actually solves the issue; however, the overarching question of "Is 
> this a bad idea?" is something I'd love an answer on as I dive into this 
> code. 
> {code:java}
>  --- 
> a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
> +++ 
> b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
> @@ -217,11 +217,17 @@ public class ThreadPoolRequestReplicator implements 
> RequestReplicator {
>              }
>          }
>  
> -        final List<NodeIdentifier> nodeIds = 
> stateMap.get(NodeConnectionState.CONNECTED);
> +        // get nodes that do not match this node.
> +        final List<NodeIdentifier> nodeIds = 
> stateMap.get(NodeConnectionState.CONNECTED).stream().filter(x -> {
> +            return clusterCoordinator.getLocalNodeIdentifier() == null || x 
> != clusterCoordinator.getLocalNodeIdentifier();
> +        }).collect(Collectors.toList());
> +
>          if (nodeIds == null || nodeIds.isEmpty()) {
>              throw new NoConnectedNodesException();
>          }
>  
> +        logger.debug("Attempting to replicate to {} nodes", nodeIds.size());
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to