Marc Parisi created NIFI-6781:
---------------------------------
Summary: ThreadPoolRequestReplicator
Key: NIFI-6781
URL: https://issues.apache.org/jira/browse/NIFI-6781
Project: Apache NiFi
Issue Type: Improvement
Components: Core Framework
Reporter: Marc Parisi
Assignee: Marc Parisi
I've noticed that replication is attempted locally. I tested a simple change to
eliminate the local node; however, I suspect this is not a big deal or I've
missed something or the cluster states does not include the local identifier.
All tests allow for local instances with different ports, implying that pruning
is potentially unnecessary or incorrect logic. Therefore I've created this as a
an "Improvement" as I dive further into the code to validate my change. If
anyone has the immediate answer regarding this code I'm happy to close this as
OBE.
{code:java}
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -217,11 +217,17 @@ public class ThreadPoolRequestReplicator implements
RequestReplicator {
}
}
- final List<NodeIdentifier> nodeIds =
stateMap.get(NodeConnectionState.CONNECTED);
+ // get nodes that do not match this node.
+ final List<NodeIdentifier> nodeIds =
stateMap.get(NodeConnectionState.CONNECTED).stream().filter(x -> {
+ return clusterCoordinator.getLocalNodeIdentifier() == null || x !=
clusterCoordinator.getLocalNodeIdentifier();
+ }).collect(Collectors.toList());
+
if (nodeIds == null || nodeIds.isEmpty()) {
throw new NoConnectedNodesException();
}
+ logger.debug("Attempting to replicate to {} nodes", nodeIds.size());
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)