[
https://issues.apache.org/jira/browse/NIFI-6781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Marc Parisi updated NIFI-6781:
------------------------------
Summary: ThreadPoolRequestReplicator appears to replicate to itself (was:
ThreadPoolRequestReplicator)
> ThreadPoolRequestReplicator appears to replicate to itself
> ----------------------------------------------------------
>
> Key: NIFI-6781
> URL: https://issues.apache.org/jira/browse/NIFI-6781
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Marc Parisi
> Assignee: Marc Parisi
> Priority: Minor
>
> I've noticed that replication is attempted locally. I tested a simple change
> to eliminate the local node; however, I suspect this is not a big deal or
> I've missed something or the cluster states does not include the local
> identifier. All tests allow for local instances with different ports,
> implying that pruning is potentially unnecessary or incorrect logic.
> Therefore I've created this as a an "Improvement" as I dive further into the
> code to validate my change. If anyone has the immediate answer regarding this
> code I'm happy to close this as OBE.
>
> {code:java}
> ---
> a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
> +++
> b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
> @@ -217,11 +217,17 @@ public class ThreadPoolRequestReplicator implements
> RequestReplicator {
> }
> }
>
> - final List<NodeIdentifier> nodeIds =
> stateMap.get(NodeConnectionState.CONNECTED);
> + // get nodes that do not match this node.
> + final List<NodeIdentifier> nodeIds =
> stateMap.get(NodeConnectionState.CONNECTED).stream().filter(x -> {
> + return clusterCoordinator.getLocalNodeIdentifier() == null || x
> != clusterCoordinator.getLocalNodeIdentifier();
> + }).collect(Collectors.toList());
> +
> if (nodeIds == null || nodeIds.isEmpty()) {
> throw new NoConnectedNodesException();
> }
>
> + logger.debug("Attempting to replicate to {} nodes", nodeIds.size());
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)