[ 
https://issues.apache.org/jira/browse/NIFI-6781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marc Parisi updated NIFI-6781:
------------------------------
    Summary: ThreadPoolRequestReplicator appears to replicate to itself  (was: 
ThreadPoolRequestReplicator)

> ThreadPoolRequestReplicator appears to replicate to itself
> ----------------------------------------------------------
>
>                 Key: NIFI-6781
>                 URL: https://issues.apache.org/jira/browse/NIFI-6781
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Marc Parisi
>            Assignee: Marc Parisi
>            Priority: Minor
>
> I've noticed that replication is attempted locally. I tested a simple change 
> to eliminate the local node; however, I suspect this is not a big deal or 
> I've missed something or the cluster states does not include the local 
> identifier. All tests allow for local instances with different ports, 
> implying that pruning is potentially unnecessary or incorrect logic. 
> Therefore I've created this as a an "Improvement" as I dive further into the 
> code to validate my change. If anyone has the immediate answer regarding this 
> code I'm happy to close this as OBE.
>  
> {code:java}
>  --- 
> a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
> +++ 
> b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
> @@ -217,11 +217,17 @@ public class ThreadPoolRequestReplicator implements 
> RequestReplicator {
>              }
>          }
>  
> -        final List<NodeIdentifier> nodeIds = 
> stateMap.get(NodeConnectionState.CONNECTED);
> +        // get nodes that do not match this node.
> +        final List<NodeIdentifier> nodeIds = 
> stateMap.get(NodeConnectionState.CONNECTED).stream().filter(x -> {
> +            return clusterCoordinator.getLocalNodeIdentifier() == null || x 
> != clusterCoordinator.getLocalNodeIdentifier();
> +        }).collect(Collectors.toList());
> +
>          if (nodeIds == null || nodeIds.isEmpty()) {
>              throw new NoConnectedNodesException();
>          }
>  
> +        logger.debug("Attempting to replicate to {} nodes", nodeIds.size());
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to