Repository: nifi
Updated Branches:
  refs/heads/master 83ca67649 -> 97afa4e7b


NIFI-5585: Fixed bug that arised when multiple nodes were decommissioning at 
same time; could get into state where the nodes queued up data for one another 
so the data just stayed put


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/be2c24cf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/be2c24cf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/be2c24cf

Branch: refs/heads/master
Commit: be2c24cfaf00f74ba9dfa1b5cf04aa43ee818afb
Parents: 04d8da8
Author: Mark Payne <[email protected]>
Authored: Mon Sep 24 09:17:22 2018 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Oct 11 09:23:00 2018 -0400

----------------------------------------------------------------------
 .../ClusterTopologyEventListener.java           |   5 +-
 .../node/NodeClusterCoordinator.java            |  15 ++-
 .../SocketLoadBalancedFlowFileQueue.java        | 130 +++++++++++++------
 .../partition/NonLocalPartitionPartitioner.java |   2 +-
 4 files changed, 101 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/be2c24cf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java
index ad9be3d..d31339b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java
@@ -17,15 +17,16 @@
 
 package org.apache.nifi.cluster.coordination;
 
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 
 public interface ClusterTopologyEventListener {
 
     void onNodeAdded(NodeIdentifier nodeId);
 
-    void onNodeOffloaded(NodeIdentifier nodeId);
-
     void onNodeRemoved(NodeIdentifier nodeId);
 
     void onLocalNodeIdentifierSet(NodeIdentifier localNodeId);
+
+    void onNodeStateChange(NodeIdentifier nodeId, NodeConnectionState 
newState);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/be2c24cf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index e165041..5b90e76 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -341,6 +341,8 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         final NodeConnectionStatus evictedStatus = nodeStatuses.put(nodeId, 
updatedStatus);
         if (evictedStatus == null) {
             onNodeAdded(nodeId, storeState);
+        } else {
+            onNodeStateChange(nodeId, updatedStatus.getState());
         }
 
         return evictedStatus;
@@ -359,6 +361,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             updated = nodeStatuses.replace(nodeId, expectedStatus, 
updatedStatus);
         }
 
+        if (updated) {
+            onNodeStateChange(nodeId, updatedStatus.getState());
+        }
+
         return updated;
     }
 
@@ -511,7 +517,6 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         request.setExplanation(explanation);
 
         addNodeEvent(nodeId, "Offload requested due to " + explanation);
-        onNodeOffloaded(nodeId);
         offloadAsynchronously(request, 10, 5);
     }
 
@@ -572,10 +577,6 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         storeState();
     }
 
-    private void onNodeOffloaded(final NodeIdentifier nodeId) {
-        eventListeners.forEach(listener -> listener.onNodeOffloaded(nodeId));
-    }
-
     private void onNodeRemoved(final NodeIdentifier nodeId) {
         eventListeners.forEach(listener -> listener.onNodeRemoved(nodeId));
     }
@@ -587,6 +588,10 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         eventListeners.forEach(listener -> listener.onNodeAdded(nodeId));
     }
 
+    private void onNodeStateChange(final NodeIdentifier nodeId, final 
NodeConnectionState nodeConnectionState) {
+        eventListeners.forEach(listener -> listener.onNodeStateChange(nodeId, 
nodeConnectionState));
+    }
+
     @Override
     public NodeConnectionStatus getConnectionStatus(final NodeIdentifier 
nodeId) {
         return nodeStatuses.get(nodeId);

http://git-wip-us.apache.org/repos/asf/nifi/blob/be2c24cf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 4c9188b..e99d17d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -19,6 +19,8 @@ package org.apache.nifi.controller.queue.clustered;
 
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.queue.AbstractFlowFileQueue;
@@ -114,7 +116,7 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
     private QueuePartition[] queuePartitions;
     private FlowFilePartitioner partitioner;
     private boolean stopped = true;
-    private boolean offloaded = false;
+    private volatile boolean offloaded = false;
 
 
     public SocketLoadBalancedFlowFileQueue(final String identifier, final 
ConnectionEventListener eventListener, final ProcessScheduler scheduler, final 
FlowFileRepository flowFileRepo,
@@ -184,26 +186,28 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
             return;
         }
 
-        // We are already load balancing but are changing how we are load 
balancing.
-        final FlowFilePartitioner partitioner;
-        switch (strategy) {
-            case DO_NOT_LOAD_BALANCE:
-                partitioner = new LocalPartitionPartitioner();
-                break;
-            case PARTITION_BY_ATTRIBUTE:
-                partitioner = new 
CorrelationAttributePartitioner(partitioningAttribute);
-                break;
-            case ROUND_ROBIN:
-                partitioner = new RoundRobinPartitioner();
-                break;
-            case SINGLE_NODE:
-                partitioner = new FirstNodePartitioner();
-                break;
-            default:
-                throw new IllegalArgumentException();
-        }
+        if (!offloaded) {
+            // We are already load balancing but are changing how we are load 
balancing.
+            final FlowFilePartitioner partitioner;
+            switch (strategy) {
+                case DO_NOT_LOAD_BALANCE:
+                    partitioner = new LocalPartitionPartitioner();
+                    break;
+                case PARTITION_BY_ATTRIBUTE:
+                    partitioner = new 
CorrelationAttributePartitioner(partitioningAttribute);
+                    break;
+                case ROUND_ROBIN:
+                    partitioner = new RoundRobinPartitioner();
+                    break;
+                case SINGLE_NODE:
+                    partitioner = new FirstNodePartitioner();
+                    break;
+                default:
+                    throw new IllegalArgumentException();
+            }
 
-        setFlowFilePartitioner(partitioner);
+            setFlowFilePartitioner(partitioner);
+        }
     }
 
     @Override
@@ -215,8 +219,33 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
 
         offloaded = true;
 
-        // TODO need to be able to reset the partitioner to the previous 
partitioner if this node is reconnected to the cluster
-        setFlowFilePartitioner(new NonLocalPartitionPartitioner());
+        partitionWriteLock.lock();
+        try {
+            final Set<NodeIdentifier> nodesToKeep = new HashSet<>();
+
+            // If we have any nodes that are connected, we only want to send 
data to the connected nodes.
+            for (final QueuePartition partition : queuePartitions) {
+                final Optional<NodeIdentifier> nodeIdOption = 
partition.getNodeIdentifier();
+                if (!nodeIdOption.isPresent()) {
+                    continue;
+                }
+
+                final NodeIdentifier nodeId = nodeIdOption.get();
+                final NodeConnectionStatus status = 
clusterCoordinator.getConnectionStatus(nodeId);
+                if (status != null && status.getState() == 
NodeConnectionState.CONNECTED) {
+                    nodesToKeep.add(nodeId);
+                }
+            }
+
+            if (!nodesToKeep.isEmpty()) {
+                setNodeIdentifiers(nodesToKeep, false);
+            }
+
+            // Update our partitioner so that we don't keep any data on the 
local partition
+            setFlowFilePartitioner(new NonLocalPartitionPartitioner());
+        } finally {
+            partitionWriteLock.unlock();
+        }
     }
 
     public synchronized void startLoadBalancing() {
@@ -566,11 +595,6 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
                 return;
             }
 
-            if (offloaded) {
-                logger.debug("{} Not going to rebalance Queue even though 
setNodeIdentifiers was called, because the queue has been offloaded", this);
-                return;
-            }
-
             logger.debug("{} Stopping the {} queue partitions in order to 
change node identifiers from {} to {}", this, queuePartitions.length, 
this.nodeIdentifiers, updatedNodeIdentifiers);
             for (final QueuePartition queuePartition : queuePartitions) {
                 queuePartition.stop();
@@ -593,7 +617,7 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
             final List<NodeIdentifier> sortedNodeIdentifiers = new 
ArrayList<>(updatedNodeIdentifiers);
             sortedNodeIdentifiers.sort(Comparator.comparing(nodeId -> 
nodeId.getApiAddress() + ":" + nodeId.getApiPort()));
 
-            final QueuePartition[] updatedQueuePartitions;
+            QueuePartition[] updatedQueuePartitions;
             if (sortedNodeIdentifiers.isEmpty()) {
                 updatedQueuePartitions = new QueuePartition[] { localPartition 
};
             } else {
@@ -601,10 +625,12 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
             }
 
             // Populate the new QueuePartitions.
+            boolean localPartitionIncluded = false;
             for (int i = 0; i < sortedNodeIdentifiers.size(); i++) {
                 final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i);
                 if 
(nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
                     updatedQueuePartitions[i] = localPartition;
+                    localPartitionIncluded = true;
 
                     // If we have RemoteQueuePartition with this Node ID with 
data, that data must be migrated to the local partition.
                     // This can happen if we didn't previously know our Node 
UUID.
@@ -622,6 +648,13 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
                 updatedQueuePartitions[i] = existingPartition == null ? 
createRemotePartition(nodeId) : existingPartition;
             }
 
+            if (!localPartitionIncluded) {
+                final QueuePartition[] withLocal = new 
QueuePartition[updatedQueuePartitions.length + 1];
+                System.arraycopy(updatedQueuePartitions, 0, withLocal, 0, 
updatedQueuePartitions.length);
+                withLocal[withLocal.length - 1] = localPartition;
+                updatedQueuePartitions = withLocal;
+            }
+
             // If the partition requires that all partitions be re-balanced 
when the number of partitions changes, then do so.
             // Otherwise, just rebalance the data from any Partitions that 
were removed, if any.
             if (partitioner.isRebalanceOnClusterResize()) {
@@ -669,6 +702,7 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
     }
 
     protected void rebalance(final QueuePartition partition) {
+        logger.debug("Rebalancing Partition {}", partition);
         final FlowFileQueueContents contents = 
partition.packageForRebalance(rebalancingPartition.getSwapPartitionName());
         rebalancingPartition.rebalance(contents);
     }
@@ -989,25 +1023,14 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
         }
 
         @Override
-        public void onNodeOffloaded(final NodeIdentifier nodeId) {
-            partitionWriteLock.lock();
-            try {
-                final Set<NodeIdentifier> updatedNodeIds = new 
HashSet<>(nodeIdentifiers);
-                updatedNodeIds.remove(nodeId);
-
-                logger.debug("Node Identifier {} offloaded. Node ID's changing 
from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds);
-                setNodeIdentifiers(updatedNodeIds, false);
-            } finally {
-                partitionWriteLock.unlock();
-            }
-        }
-
-        @Override
         public void onNodeRemoved(final NodeIdentifier nodeId) {
             partitionWriteLock.lock();
             try {
                 final Set<NodeIdentifier> updatedNodeIds = new 
HashSet<>(nodeIdentifiers);
-                updatedNodeIds.remove(nodeId);
+                final boolean removed = updatedNodeIds.remove(nodeId);
+                if (!removed) {
+                    return;
+                }
 
                 logger.debug("Node Identifier {} removed from cluster. Node 
ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds);
                 setNodeIdentifiers(updatedNodeIds, false);
@@ -1063,6 +1086,27 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
                 partitionWriteLock.unlock();
             }
         }
+
+        @Override
+        public void onNodeStateChange(final NodeIdentifier nodeId, final 
NodeConnectionState newState) {
+            partitionWriteLock.lock();
+            try {
+                if (!offloaded) {
+                    return;
+                }
+
+                switch (newState) {
+                    case OFFLOADED:
+                    case OFFLOADING:
+                    case DISCONNECTED:
+                    case DISCONNECTING:
+                        onNodeRemoved(nodeId);
+                        break;
+                }
+            } finally {
+                partitionWriteLock.unlock();
+            }
+        }
     }
 }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/be2c24cf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java
index cffaefd..0953ce2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java
@@ -47,7 +47,7 @@ public class NonLocalPartitionPartitioner implements 
FlowFilePartitioner {
 
     @Override
     public boolean isRebalanceOnClusterResize() {
-        return false;
+        return true;
     }
 
 

Reply via email to