Repository: nifi Updated Branches: refs/heads/master 768bcfb50 -> c87d79193
NIFI-5663: Ensure that when sort Node Identifiers that we use both the node's API Address as well as API Port, in case 2 nodes are running on same host. Also ensure that when Local Node ID is determined that we update all Load Balancing Partitions, if necessary This closes #3048. Signed-off-by: Koji Kawamura <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c87d7919 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c87d7919 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c87d7919 Branch: refs/heads/master Commit: c87d791938562de04ee598ebffa296f954130ca7 Parents: 768bcfb Author: Mark Payne <[email protected]> Authored: Fri Oct 5 12:06:39 2018 -0400 Committer: Koji Kawamura <[email protected]> Committed: Tue Oct 9 21:14:31 2018 +0900 ---------------------------------------------------------------------- .../clustered/SocketLoadBalancedFlowFileQueue.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/c87d7919/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java index f250200..193a961 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java @@ -571,7 +571,7 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple // Re-define 'queuePartitions' array final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(updatedNodeIdentifiers); - sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress)); + sortedNodeIdentifiers.sort(Comparator.comparing(nodeId -> nodeId.getApiAddress() + ":" + nodeId.getApiPort())); final QueuePartition[] updatedQueuePartitions; if (sortedNodeIdentifiers.isEmpty()) { @@ -990,6 +990,14 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple return; } + if (!nodeIdentifiers.contains(localNodeId)) { + final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers); + updatedNodeIds.add(localNodeId); + + logger.debug("Local Node Identifier has now been determined to be {}. Adding to set of Node Identifiers for {}", localNodeId, SocketLoadBalancedFlowFileQueue.this); + setNodeIdentifiers(updatedNodeIds, false); + } + logger.debug("Local Node Identifier set to {}; current partitions = {}", localNodeId, queuePartitions); for (final QueuePartition partition : queuePartitions) { @@ -1009,7 +1017,9 @@ public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue imple logger.debug("{} Local Node Identifier set to {} and found Queue Partition {} with that Node Identifier. Will force update of partitions", SocketLoadBalancedFlowFileQueue.this, localNodeId, partition); - setNodeIdentifiers(SocketLoadBalancedFlowFileQueue.this.nodeIdentifiers, true); + final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers); + updatedNodeIds.add(localNodeId); + setNodeIdentifiers(updatedNodeIds, true); return; } }
