NIFI-5585 A node that was previously offloaded can now be reconnected to the 
cluster and queue flowfiles again
Added Spock test for NonLocalPartitionPartitioner
Updated NOTICE files for FontAwesome with the updated version (4.7.0) and URL 
to the free license
Updated package-lock.json with the updated version of FontAwesome (4.7.0)
Added method to FlowFileQueue interface to reset an offloaded queue
Queues that are now immediately have the offloaded status reset once offloading 
finishes
SocketLoadBalancedFlowFileQueue now ignores back-pressure when offloading 
flowfiles
Cleaned up javascript in nf-cluster-table.js when creating markup for the node 
operation icons
Fixed incorrect handling of a heartbeat from an offloaded node.  Heartbeats 
from offloading or offloaded nodes will now be reported as an event, the 
heartbeat will be removed and ignored.
Added unit tests and integration tests to cover offloading nodes
Updated Cluster integration test class with accessor for the current cluster 
coordinator
Updated Node integration test class's custom NiFiProperties implementation to 
return the load balancing port and a method to assert an offloaded node
Added exclusion to top-level pom for ITSpec.class


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/01e2098d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/01e2098d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/01e2098d

Branch: refs/heads/master
Commit: 01e2098d242f45f519bee0572de3c86cc7837645
Parents: be2c24c
Author: Jeff Storck <[email protected]>
Authored: Tue Sep 25 15:17:19 2018 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Oct 11 09:23:01 2018 -0400

----------------------------------------------------------------------
 nifi-assembly/NOTICE                            |   2 +-
 .../nifi/controller/queue/FlowFileQueue.java    |  14 ++
 .../src/main/resources/META-INF/NOTICE          |   2 +-
 .../coordination/ClusterCoordinator.java        |   9 +-
 .../heartbeat/AbstractHeartbeatMonitor.java     |  12 +-
 .../ThreadPoolRequestReplicator.java            |   1 +
 .../node/NodeClusterCoordinator.java            |   3 +-
 .../node/NodeClusterCoordinatorSpec.groovy      |  99 ++++++++++++++
 .../integration/OffloadNodeITSpec.groovy        |  50 ++++++++
 .../nifi/cluster/integration/Cluster.java       |   4 +
 .../apache/nifi/cluster/integration/Node.java   |  15 +++
 .../nifi/controller/StandardFlowService.java    |   6 +-
 .../controller/queue/StandardFlowFileQueue.java |   4 +
 .../SocketLoadBalancedFlowFileQueue.java        |  72 ++++++++---
 .../controller/StandardFlowServiceSpec.groovy   | 128 +++++++++++++++++++
 .../NonLocalPartitionPartitionerSpec.groovy     | 107 ++++++++++++++++
 .../TestWriteAheadFlowFileRepository.java       |   4 +
 .../nifi/web/StandardNiFiServiceFacade.java     |   3 +-
 .../src/main/frontend/package-lock.json         |   6 +-
 .../src/main/resources/META-INF/NOTICE          |   2 +-
 .../webapp/js/nf/cluster/nf-cluster-table.js    |  41 +++---
 .../impl/command/nifi/nodes/DisconnectNode.java |   2 +-
 pom.xml                                         |   3 +
 23 files changed, 527 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-assembly/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 0bea06a..dd2bf6d 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -1873,4 +1873,4 @@ SIL OFL 1.1
 ******************
 
 The following binary components are provided under the SIL Open Font License 
1.1
-  (SIL OFL 1.1) FontAwesome (4.6.1 - 
http://fortawesome.github.io/Font-Awesome/license/)
+  (SIL OFL 1.1) FontAwesome (4.7.0 - https://fontawesome.com/license/free)

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index 7cd0e30..8870f1d 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -267,8 +267,22 @@ public interface FlowFileQueue {
 
     void setLoadBalanceStrategy(LoadBalanceStrategy strategy, String 
partitioningAttribute);
 
+    /**
+     * Offloads the flowfiles in the queue to other nodes.  This disables the 
queue from partition flowfiles locally.
+     * <p>
+     * This operation is a no-op if the node that contains this queue is not 
in a cluster.
+     */
     void offloadQueue();
 
+    /**
+     * Resets a queue that has previously been offloaded.  This allows the 
queue to partition flowfiles locally, and
+     * has no other effect on processors or remote process groups.
+     * <p>
+     * This operation is a no-op if the queue is not currently offloaded or 
the node that contains this queue is not
+     * clustered.
+     */
+    void resetOffloadedQueue();
+
     LoadBalanceStrategy getLoadBalanceStrategy();
 
     void setLoadBalanceCompression(LoadBalanceCompression compression);

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
index e6a7322..fae5c91 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework-nar/src/main/resources/META-INF/NOTICE
@@ -212,6 +212,6 @@ SIL OFL 1.1
 ******************
 
 The following binary components are provided under the SIL Open Font License 
1.1
-  (SIL OFL 1.1) FontAwesome (4.6.1 - 
http://fortawesome.github.io/Font-Awesome/license/)
+  (SIL OFL 1.1) FontAwesome (4.7.0 - https://fontawesome.com/license/free)
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 2ad0e70..fad51d1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -72,7 +72,14 @@ public interface ClusterCoordinator {
     /**
      * Sends a request to the node to be offloaded.
      * The node will be marked as offloading immediately.
-     *
+     * <p>
+     * When a node is offloaded:
+     * <ul>
+     *     <li>all processors on the node are stopped</li>
+     *     <li>all processors on the node are terminated</li>
+     *     <li>all remote process groups on the node stop transmitting</li>
+     *     <li>all flowfiles on the node are sent to other nodes in the 
cluster</li>
+     * </ul>
      * @param nodeId the identifier of the node
      * @param offloadCode the code that represents why this node is being 
asked to be offloaded
      * @param explanation an explanation as to why the node is being asked to 
be offloaded

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index f6d09ab..5fbe3f8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -228,12 +228,14 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
             return;
         }
 
-        if (NodeConnectionState.OFFLOADED == connectionState) {
-            // Cluster Coordinator believes that node is offloaded, but let 
the node reconnect
-            clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received 
heartbeat from node that is offloaded. " +
-                    "Marking as Disconnected and requesting that Node 
reconnect to cluster");
-            clusterCoordinator.requestNodeConnect(nodeId, null);
+        if (NodeConnectionState.OFFLOADED == connectionState || 
NodeConnectionState.OFFLOADING == connectionState) {
+            // Cluster Coordinator can ignore this heartbeat since the node is 
offloaded
+            clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received 
heartbeat from node that is offloading " +
+                    "or offloaded. Removing this heartbeat.  Offloaded nodes 
will only be reconnected to the cluster by an " +
+                    "explicit connection request or restarting the node.");
+            removeHeartbeat(nodeId);
         }
+
         if (NodeConnectionState.DISCONNECTED == connectionState) {
             // ignore heartbeats from nodes disconnected by means other than 
lack of heartbeat, unless it is
             // the only node. We allow it if it is the only node because if we 
have a one-node cluster, then

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 0b2f1fe..b3a3ab9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -31,6 +31,7 @@ import 
org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestExc
 import 
org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
 import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
 import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
+import 
org.apache.nifi.cluster.manager.exception.OffloadedNodeMutableRequestException;
 import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
 import org.apache.nifi.cluster.manager.exception.UriConstructionException;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index 5b90e76..8c83a1d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -486,11 +486,12 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         }
 
         if (state != NodeConnectionState.OFFLOADING) {
-            logger.warn("Attempted to finish node offload for {} but node is 
not in a offload state, it is currently {}.", nodeId, state);
+            logger.warn("Attempted to finish node offload for {} but node is 
not in the offloading state, it is currently {}.", nodeId, state);
             return;
         }
 
         logger.info("{} is now offloaded", nodeId);
+
         updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.OFFLOADED));
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinatorSpec.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinatorSpec.groovy
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinatorSpec.groovy
new file mode 100644
index 0000000..2751b3e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinatorSpec.groovy
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.coordination.node
+
+import org.apache.nifi.cluster.coordination.flow.FlowElection
+import org.apache.nifi.cluster.firewall.ClusterNodeFirewall
+import org.apache.nifi.cluster.protocol.NodeIdentifier
+import org.apache.nifi.cluster.protocol.NodeProtocolSender
+import 
org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener
+import org.apache.nifi.cluster.protocol.message.OffloadMessage
+import org.apache.nifi.components.state.Scope
+import org.apache.nifi.components.state.StateManager
+import org.apache.nifi.components.state.StateManagerProvider
+import org.apache.nifi.controller.leader.election.LeaderElectionManager
+import org.apache.nifi.events.EventReporter
+import org.apache.nifi.reporting.Severity
+import org.apache.nifi.state.MockStateMap
+import org.apache.nifi.util.NiFiProperties
+import org.apache.nifi.web.revision.RevisionManager
+import spock.lang.Specification
+import spock.util.concurrent.BlockingVariable
+
+import java.util.concurrent.TimeUnit
+
+class NodeClusterCoordinatorSpec extends Specification {
+    def "requestNodeOffload"() {
+        given: 'mocked collaborators'
+        def clusterCoordinationProtocolSenderListener = 
Mock(ClusterCoordinationProtocolSenderListener)
+        def eventReporter = Mock EventReporter
+        def stateManager = Mock StateManager
+        def stateMap = new MockStateMap([:], 1)
+        stateManager.getState(_ as Scope) >> stateMap
+        def stateManagerProvider = Mock StateManagerProvider
+        stateManagerProvider.getStateManager(_ as String) >> stateManager
+
+        and: 'a NodeClusterCoordinator that manages node status in a 
synchronized list'
+        List<NodeConnectionStatus> nodeStatuses = [].asSynchronized()
+        def clusterCoordinator = new 
NodeClusterCoordinator(clusterCoordinationProtocolSenderListener, 
eventReporter, Mock(LeaderElectionManager),
+                Mock(FlowElection), Mock(ClusterNodeFirewall),
+                Mock(RevisionManager), 
NiFiProperties.createBasicNiFiProperties('src/test/resources/conf/nifi.properties',
 [:]),
+                Mock(NodeProtocolSender), stateManagerProvider) {
+            @Override
+            void notifyOthersOfNodeStatusChange(NodeConnectionStatus 
updatedStatus, boolean notifyAllNodes, boolean waitForCoordinator) {
+                nodeStatuses.add(updatedStatus)
+            }
+        }
+
+        and: 'two nodes'
+        def nodeIdentifier1 = createNodeIdentifier 1
+        def nodeIdentifier2 = createNodeIdentifier 2
+
+        and: 'node 1 is connected, node 2 is disconnected'
+        clusterCoordinator.updateNodeStatus new 
NodeConnectionStatus(nodeIdentifier1, NodeConnectionState.CONNECTED)
+        clusterCoordinator.updateNodeStatus new 
NodeConnectionStatus(nodeIdentifier2, NodeConnectionState.DISCONNECTED)
+        while (nodeStatuses.size() < 2) {
+            Thread.sleep(10)
+        }
+        nodeStatuses.clear()
+
+        def waitForReportEvent = new BlockingVariable(5, TimeUnit.SECONDS)
+
+        when: 'a node is requested to offload'
+        clusterCoordinator.requestNodeOffload nodeIdentifier2, 
OffloadCode.OFFLOADED, 'unit test for offloading node'
+        waitForReportEvent.get()
+
+        then: 'no exceptions are thrown'
+        noExceptionThrown()
+
+        and: 'expected methods on collaborators are invoked'
+        1 * clusterCoordinationProtocolSenderListener.offload({ OffloadMessage 
msg -> msg.nodeId == nodeIdentifier2 } as OffloadMessage)
+        1 * eventReporter.reportEvent(Severity.INFO, 'Clustering', { msg -> 
msg.contains "$nodeIdentifier2.apiAddress:$nodeIdentifier2.apiPort" } as 
String) >> {
+            waitForReportEvent.set(it)
+        }
+
+        and: 'the status of the offloaded node is known by the cluster 
coordinator to be offloading'
+        nodeStatuses[0].nodeIdentifier == nodeIdentifier2
+        nodeStatuses[0].state == NodeConnectionState.OFFLOADING
+    }
+
+    private static NodeIdentifier createNodeIdentifier(final int index) {
+        new NodeIdentifier("node-id-$index", "localhost", 8000 + index, 
"localhost", 9000 + index,
+                "localhost", 10000 + index, 11000 + index, false)
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/integration/OffloadNodeITSpec.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/integration/OffloadNodeITSpec.groovy
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/integration/OffloadNodeITSpec.groovy
new file mode 100644
index 0000000..a8dd158
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/groovy/org/apache/nifi/cluster/integration/OffloadNodeITSpec.groovy
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.integration
+
+import org.apache.nifi.cluster.coordination.node.DisconnectionCode
+import org.apache.nifi.cluster.coordination.node.OffloadCode
+import spock.lang.Specification
+
+import java.util.concurrent.TimeUnit
+
+class OffloadNodeITSpec extends Specification {
+    def "requestNodeOffload"() {
+        given: 'a cluster with 3 nodes'
+        System.setProperty 'nifi.properties.file.path', 
'src/test/resources/conf/nifi.properties'
+        def cluster = new Cluster()
+        cluster.start()
+        cluster.createNode()
+        def nodeToOffload = cluster.createNode()
+        cluster.createNode()
+        cluster.waitUntilAllNodesConnected 20, TimeUnit.SECONDS
+
+        when: 'the node to offload is disconnected successfully'
+        
cluster.currentClusterCoordinator.clusterCoordinator.requestNodeDisconnect 
nodeToOffload.identifier, DisconnectionCode.USER_DISCONNECTED,
+                'integration test user disconnect'
+        cluster.currentClusterCoordinator.assertNodeDisconnects 
nodeToOffload.identifier, 10, TimeUnit.SECONDS
+
+        and: 'the node to offload is requested to offload'
+        nodeToOffload.getClusterCoordinator().requestNodeOffload 
nodeToOffload.identifier, OffloadCode.OFFLOADED, 'integration test offload'
+
+        then: 'the node has been successfully offloaded'
+        cluster.currentClusterCoordinator.assertNodeIsOffloaded 
nodeToOffload.identifier, 10, TimeUnit.SECONDS
+
+        cleanup:
+        cluster.stop()
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
index dab073d..370d6dc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Cluster.java
@@ -144,6 +144,10 @@ public class Cluster {
         return node;
     }
 
+    public Node getCurrentClusterCoordinator() {
+        return getNodes().stream().filter(node -> 
node.hasRole(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null);
+    }
+
     public Node waitForClusterCoordinator(final long time, final TimeUnit 
timeUnit) {
         return ClusterUtils.waitUntilNonNull(time, timeUnit,
             () -> getNodes().stream().filter(node -> 
node.hasRole(ClusterRoles.CLUSTER_COORDINATOR)).findFirst().orElse(null));

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
index 3133736..b2a499a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/integration/Node.java
@@ -108,6 +108,8 @@ public class Node {
                     return String.valueOf(nodeId.getSocketPort());
                 }else if(key.equals(NiFiProperties.WEB_HTTP_PORT)){
                     return String.valueOf(nodeId.getApiPort());
+                }else if(key.equals(NiFiProperties.LOAD_BALANCE_PORT)){
+                    return String.valueOf(nodeId.getLoadBalancePort());
                 }else {
                     return properties.getProperty(key);
                 }
@@ -386,4 +388,17 @@ public class Node {
     public void assertNodeIsConnected(final NodeIdentifier nodeId) {
         Assert.assertEquals(NodeConnectionState.CONNECTED, 
getClusterCoordinator().getConnectionStatus(nodeId).getState());
     }
+
+    /**
+     * Assert that the node with the given ID is offloaded (according to this 
node!) within the given amount of time
+     *
+     * @param nodeId id of the node
+     * @param time how long to wait
+     * @param timeUnit unit of time provided by the 'time' argument
+     */
+    public void assertNodeIsOffloaded(final NodeIdentifier nodeId, final long 
time, final TimeUnit timeUnit) {
+        ClusterUtils.waitUntilConditionMet(time, timeUnit,
+                () -> 
getClusterCoordinator().getConnectionStatus(nodeId).getState() == 
NodeConnectionState.OFFLOADED,
+                () -> "Connection Status is " + 
getClusterCoordinator().getConnectionStatus(nodeId).toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 297595f..957357b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -691,20 +691,19 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
     private void offload(final String explanation) throws InterruptedException 
{
         writeLock.lock();
         try {
-
             logger.info("Offloading node due to " + explanation);
 
             // mark node as offloading
             controller.setConnectionStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.OFFLOADING, OffloadCode.OFFLOADED, explanation));
             // request to stop all processors on node
             controller.stopAllProcessors();
-            // request to stop all remote process groups
-            
controller.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::stopTransmitting);
             // terminate all processors
             controller.getRootGroup().findAllProcessors()
                     // filter stream, only stopped processors can be terminated
                     .stream().filter(pn -> pn.getScheduledState() == 
ScheduledState.STOPPED)
                     .forEach(pn -> 
pn.getProcessGroup().terminateProcessor(pn));
+            // request to stop all remote process groups
+            
controller.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::stopTransmitting);
             // offload all queues on node
             controller.getAllQueues().forEach(FlowFileQueue::offloadQueue);
             // wait for rebalance of flowfiles on all queues
@@ -713,6 +712,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                 Thread.sleep(1000);
             }
             // finish offload
+            
controller.getAllQueues().forEach(FlowFileQueue::resetOffloadedQueue);
             controller.setConnectionStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.OFFLOADED, OffloadCode.OFFLOADED, explanation));
             clusterCoordinator.finishNodeOffload(getNodeId());
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
index ee222f4..8872ba7 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
@@ -79,6 +79,10 @@ public class StandardFlowFileQueue extends 
AbstractFlowFileQueue implements Flow
     }
 
     @Override
+    public void resetOffloadedQueue() {
+    }
+
+    @Override
     public boolean isActivelyLoadBalancing() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index e99d17d..7b3a211 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -189,34 +189,41 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
         if (!offloaded) {
             // We are already load balancing but are changing how we are load 
balancing.
             final FlowFilePartitioner partitioner;
-            switch (strategy) {
-                case DO_NOT_LOAD_BALANCE:
-                    partitioner = new LocalPartitionPartitioner();
-                    break;
-                case PARTITION_BY_ATTRIBUTE:
-                    partitioner = new 
CorrelationAttributePartitioner(partitioningAttribute);
-                    break;
-                case ROUND_ROBIN:
-                    partitioner = new RoundRobinPartitioner();
-                    break;
-                case SINGLE_NODE:
-                    partitioner = new FirstNodePartitioner();
-                    break;
-                default:
-                    throw new IllegalArgumentException();
-            }
+            partitioner = getPartitionerForLoadBalancingStrategy(strategy, 
partitioningAttribute);
 
             setFlowFilePartitioner(partitioner);
         }
     }
 
+    private FlowFilePartitioner 
getPartitionerForLoadBalancingStrategy(LoadBalanceStrategy strategy, String 
partitioningAttribute) {
+        FlowFilePartitioner partitioner;
+        switch (strategy) {
+            case DO_NOT_LOAD_BALANCE:
+                partitioner = new LocalPartitionPartitioner();
+                break;
+            case PARTITION_BY_ATTRIBUTE:
+                partitioner = new 
CorrelationAttributePartitioner(partitioningAttribute);
+                break;
+            case ROUND_ROBIN:
+                partitioner = new RoundRobinPartitioner();
+                break;
+            case SINGLE_NODE:
+                partitioner = new FirstNodePartitioner();
+                break;
+            default:
+                throw new IllegalArgumentException();
+        }
+        return partitioner;
+    }
+
     @Override
     public void offloadQueue() {
         if (clusterCoordinator == null) {
-            // Not clustered, so don't change partitions
+            // Not clustered, cannot offload the queue to other nodes
             return;
         }
 
+        logger.debug("Setting queue {} on node {} as offloaded", this, 
clusterCoordinator.getLocalNodeIdentifier());
         offloaded = true;
 
         partitionWriteLock.lock();
@@ -248,6 +255,26 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
         }
     }
 
+    @Override
+    public void resetOffloadedQueue() {
+        if (clusterCoordinator == null) {
+            // Not clustered, was not offloading the queue to other nodes
+            return;
+        }
+
+        if (offloaded) {
+            // queue was offloaded previously, allow files to be added to the 
local partition
+            offloaded = false;
+            logger.debug("Queue {} on node {} was previously offloaded, 
resetting offloaded status to {}",
+                    this, clusterCoordinator.getLocalNodeIdentifier(), 
offloaded);
+            // reset the partitioner based on the load balancing strategy, 
since offloading previously changed the partitioner
+            FlowFilePartitioner partitioner = 
getPartitionerForLoadBalancingStrategy(getLoadBalanceStrategy(), 
getPartitioningAttribute());
+            setFlowFilePartitioner(partitioner);
+            logger.debug("Queue {} is no longer offloaded, restored load 
balance strategy to {} and partitioning attribute to \"{}\"",
+                    this, getLoadBalanceStrategy(), 
getPartitioningAttribute());
+        }
+    }
+
     public synchronized void startLoadBalancing() {
         logger.debug("{} started. Will begin distributing FlowFiles across the 
cluster", this);
 
@@ -884,8 +911,9 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
 
     @Override
     public boolean isPropagateBackpressureAcrossNodes() {
-        // TODO: We will want to modify this when we have the ability to 
offload flowfiles from a node.
-        return true;
+        // If offloaded = false, the queue is not offloading; return true to 
honor backpressure
+        // If offloaded = true, the queue is offloading or has finished 
offloading; return false to ignore backpressure
+        return !offloaded;
     }
 
     @Override
@@ -1096,6 +1124,12 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
                 }
 
                 switch (newState) {
+                    case CONNECTED:
+                        if (nodeId != null && 
nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) {
+                            // the node with this queue was connected to the 
cluster, make sure the queue is not offloaded
+                            resetOffloadedQueue();
+                        }
+                        break;
                     case OFFLOADED:
                     case OFFLOADING:
                     case DISCONNECTED:

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowServiceSpec.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowServiceSpec.groovy
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowServiceSpec.groovy
new file mode 100644
index 0000000..63fedab
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/StandardFlowServiceSpec.groovy
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller
+
+import org.apache.nifi.authorization.Authorizer
+import org.apache.nifi.cluster.coordination.ClusterCoordinator
+import org.apache.nifi.cluster.coordination.node.NodeConnectionState
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus
+import org.apache.nifi.cluster.coordination.node.OffloadCode
+import org.apache.nifi.cluster.protocol.NodeIdentifier
+import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener
+import org.apache.nifi.cluster.protocol.message.OffloadMessage
+import org.apache.nifi.components.state.Scope
+import org.apache.nifi.components.state.StateManager
+import org.apache.nifi.components.state.StateManagerProvider
+import org.apache.nifi.controller.queue.FlowFileQueue
+import org.apache.nifi.controller.status.ProcessGroupStatus
+import org.apache.nifi.encrypt.StringEncryptor
+import org.apache.nifi.groups.ProcessGroup
+import org.apache.nifi.groups.RemoteProcessGroup
+import org.apache.nifi.state.MockStateMap
+import org.apache.nifi.util.NiFiProperties
+import org.apache.nifi.web.revision.RevisionManager
+import spock.lang.Specification
+import spock.util.concurrent.BlockingVariable
+
+import java.util.concurrent.TimeUnit
+
+class StandardFlowServiceSpec extends Specification {
+    def "handle an OffloadMessage"() {
+        given: 'a node to offload'
+        def nodeToOffload = createNodeIdentifier 1
+
+        and: 'a simple flow with one root group and a single processor'
+        def stateManager = Mock StateManager
+        def stateMap = new MockStateMap([:], 1)
+        stateManager.getState(_ as Scope) >> stateMap
+        def stateManagerProvider = Mock StateManagerProvider
+        stateManagerProvider.getStateManager(_ as String) >> stateManager
+
+        def rootGroup = Mock ProcessGroup
+        def flowController = Mock FlowController
+        flowController.getStateManagerProvider() >> stateManagerProvider
+        _ * flowController.rootGroup >> rootGroup
+
+        def clusterCoordinator = Mock ClusterCoordinator
+
+        def processGroupStatus = Mock ProcessGroupStatus
+        def processorNode = Mock ProcessorNode
+        def remoteProcessGroup = Mock RemoteProcessGroup
+        def flowFileQueue = Mock FlowFileQueue
+
+        and: 'a flow service to handle the OffloadMessage'
+        def flowService = 
StandardFlowService.createClusteredInstance(flowController, 
NiFiProperties.createBasicNiFiProperties('src/test/resources/conf/nifi.properties',
+                [(NiFiProperties.CLUSTER_NODE_PROTOCOL_PORT): 
nodeToOffload.socketPort as String,
+                 (NiFiProperties.WEB_HTTP_PORT)             : 
nodeToOffload.apiPort as String,
+                 (NiFiProperties.LOAD_BALANCE_PORT)         : 
nodeToOffload.loadBalancePort as String]),
+                Mock(NodeProtocolSenderListener), clusterCoordinator, 
Mock(StringEncryptor), Mock(RevisionManager), Mock(Authorizer))
+
+        def waitForFinishOffload = new BlockingVariable(5, 
TimeUnit.SECONDS)//new CountDownLatch(1)
+
+        when: 'the flow services receives an OffloadMessage'
+        flowService.handle(new OffloadMessage(nodeId: nodeToOffload, 
explanation: 'unit test offload'), [] as Set)
+        waitForFinishOffload.get()
+
+        then: 'no exceptions are thrown'
+        noExceptionThrown()
+
+        and: 'the connection status for the node in the flow controller is set 
to OFFLOADING'
+        1 * flowController.setConnectionStatus({ NodeConnectionStatus status ->
+            status.nodeIdentifier.logicallyEquals(nodeToOffload) && 
status.state == NodeConnectionState.OFFLOADING && status.offloadCode == 
OffloadCode.OFFLOADED
+        } as NodeConnectionStatus)
+
+        then: 'all processors are requested to stop'
+        1 * flowController.stopAllProcessors()
+
+        then: 'all processors are requested to terminate'
+        1 * processorNode.scheduledState >> ScheduledState.STOPPED
+        1 * processorNode.processGroup >> rootGroup
+        1 * rootGroup.terminateProcessor({ ProcessorNode pn -> pn == 
processorNode } as ProcessorNode)
+        1 * rootGroup.findAllProcessors() >> [processorNode]
+
+        then: 'all remote process groups are requested to terminate'
+        1 * remoteProcessGroup.stopTransmitting()
+        1 * rootGroup.findAllRemoteProcessGroups() >> [remoteProcessGroup]
+
+        then: 'all queues are requested to offload'
+        1 * flowFileQueue.offloadQueue()
+        1 * flowController.getAllQueues() >> [flowFileQueue]
+
+        then: 'the queued count in the flow controller status is 0 to allow 
the offloading code to to complete'
+        1 * flowController.getControllerStatus() >> processGroupStatus
+        1 * processGroupStatus.getQueuedCount() >> 0
+
+        then: 'all queues are requested to reset to the original partitioner 
for the load balancing strategy'
+        1 * flowFileQueue.resetOffloadedQueue()
+        1 * flowController.getAllQueues() >> [flowFileQueue]
+
+        then: 'the connection status for the node in the flow controller is 
set to OFFLOADED'
+        1 * flowController.setConnectionStatus({ NodeConnectionStatus status ->
+            status.nodeIdentifier.logicallyEquals(nodeToOffload) && 
status.state == NodeConnectionState.OFFLOADED && status.offloadCode == 
OffloadCode.OFFLOADED
+        } as NodeConnectionStatus)
+
+        then: 'the cluster coordinator is requested to finish the node offload'
+        1 * clusterCoordinator.finishNodeOffload({ NodeIdentifier 
nodeIdentifier ->
+            nodeIdentifier.logicallyEquals(nodeToOffload)
+        } as NodeIdentifier) >> { waitForFinishOffload.set(it) }
+    }
+
+    private static NodeIdentifier createNodeIdentifier(final int index) {
+        new NodeIdentifier("node-id-$index", "localhost", 8000 + index, 
"localhost", 9000 + index,
+                "localhost", 10000 + index, 11000 + index, false)
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitionerSpec.groovy
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitionerSpec.groovy
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitionerSpec.groovy
new file mode 100644
index 0000000..f3cfd4c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/groovy/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitionerSpec.groovy
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue.clustered.partition
+
+
+import org.apache.nifi.controller.repository.FlowFileRecord
+import spock.lang.Specification
+import spock.lang.Unroll
+
+import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.Executors
+import java.util.concurrent.TimeUnit
+
+class NonLocalPartitionPartitionerSpec extends Specification {
+
+    def "getPartition chooses local partition with 1 partition and throws 
IllegalStateException"() {
+        given: "a local partitioner using a local partition"
+        def partitioner = new NonLocalPartitionPartitioner()
+        def localPartition = Mock QueuePartition
+        def partitions = [localPartition] as QueuePartition[]
+        def flowFileRecord = Mock FlowFileRecord
+
+        when: "a partition is requested from the partitioner"
+        partitioner.getPartition flowFileRecord, partitions, localPartition
+
+        then: "an IllegalStateExceptions thrown"
+        thrown(IllegalStateException)
+    }
+
+    @Unroll
+    def "getPartition chooses non-local partition with #maxPartitions 
partitions, #threads threads, #iterations iterations"() {
+        given: "a local partitioner"
+        def partitioner = new NonLocalPartitionPartitioner()
+        def partitions = new QueuePartition[maxPartitions]
+
+        and: "a local partition"
+        def localPartition = Mock QueuePartition
+        partitions[0] = localPartition
+
+        and: "one or more multiple partitions"
+        for (int id = 1; id < maxPartitions; ++id) {
+            def partition = Mock QueuePartition
+            partitions[id] = partition
+        }
+
+        and: "an array to hold the resulting chosen partitions and an executor 
service with one or more threads"
+        def flowFileRecord = Mock FlowFileRecord
+        def chosenPartitions = [] as ConcurrentLinkedQueue
+        def executorService = Executors.newFixedThreadPool threads
+
+        when: "a partition is requested from the partitioner for a given 
flowfile record and the existing partitions"
+        iterations.times {
+            executorService.submit {
+                chosenPartitions.add partitioner.getPartition(flowFileRecord, 
partitions, localPartition)
+            }
+        }
+        executorService.shutdown()
+        try {
+            while (!executorService.awaitTermination(10, 
TimeUnit.MILLISECONDS)) {
+                Thread.sleep(10)
+            }
+        } catch (InterruptedException e) {
+            executorService.shutdownNow()
+            Thread.currentThread().interrupt()
+        }
+
+        then: "no exceptions are thrown"
+        noExceptionThrown()
+
+        and: "there is a chosen partition for each iteration"
+        chosenPartitions.size() == iterations
+
+        and: "each chosen partition is a remote partition and is one of the 
existing partitions"
+        def validChosenPartitions = chosenPartitions.findAll { it != 
localPartition && partitions.contains(it) }
+
+        and: "there is a valid chosen partition for each iteration"
+        validChosenPartitions.size() == iterations
+
+        and: "there are no other mock interactions"
+        0 * _
+
+        where:
+        maxPartitions | threads | iterations
+        2             | 1       | 1
+        2             | 1       | 10
+        2             | 1       | 100
+        2             | 10      | 1000
+        5             | 1       | 1
+        5             | 1       | 10
+        5             | 1       | 100
+        5             | 10      | 1000
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 878ad13..a3ee5c1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -114,6 +114,10 @@ public class TestWriteAheadFlowFileRepository {
             }
 
             @Override
+            public void resetOffloadedQueue() {
+            }
+
+            @Override
             public boolean isActivelyLoadBalancing() {
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 4ef241e..d875118 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -4707,7 +4707,8 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
         final NodeConnectionStatus nodeConnectionStatus = 
clusterCoordinator.getConnectionStatus(nodeIdentifier);
         if 
(!nodeConnectionStatus.getState().equals(NodeConnectionState.OFFLOADED) && 
!nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) {
-            throw new IllegalNodeDeletionException("Cannot remove Node with ID 
" + nodeId + " because it is not disconnected, current state = " + 
nodeConnectionStatus.getState());
+            throw new IllegalNodeDeletionException("Cannot remove Node with ID 
" + nodeId +
+                    " because it is not disconnected or offloaded, current 
state = " + nodeConnectionStatus.getState());
         }
 
         clusterCoordinator.removeNode(nodeIdentifier, userDn);

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package-lock.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package-lock.json
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package-lock.json
index 1d848a6..3f505a2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package-lock.json
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package-lock.json
@@ -362,9 +362,9 @@
       "integrity": 
"sha512-ipiDYhdQSCZ4hSbX4rMW+XzNKMD1prg/sTvoVmSLkuQ1MVlwjJQQA+sW8tMYR3BLUr9KjodFV4pvzunvRhd33Q=="
     },
     "font-awesome": {
-      "version": "4.6.1",
-      "resolved": 
"https://registry.npmjs.org/font-awesome/-/font-awesome-4.6.1.tgz";,
-      "integrity": "sha1-VHJl+0xFu+2Qq4vE93qXs3uFKhI="
+      "version": "4.7.0",
+      "resolved": 
"https://registry.npmjs.org/font-awesome/-/font-awesome-4.7.0.tgz";,
+      "integrity": "sha1-j6jPBBGhoxr9B7BtKQK7n8gVoTM="
     },
     "has-color": {
       "version": "0.1.7",

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/META-INF/NOTICE
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/META-INF/NOTICE
index c0b74bc..300c6b9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/META-INF/NOTICE
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/resources/META-INF/NOTICE
@@ -8,4 +8,4 @@ SIL OFL 1.1
 ******************
 
 The following binary components are provided under the SIL Open Font License 
1.1
-  (SIL OFL 1.1) FontAwesome (4.6.1 - 
http://fortawesome.github.io/Font-Awesome/license/)
+  (SIL OFL 1.1) FontAwesome (4.7.0 - https://fontawesome.com/license/free)

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js
index 0dc74d7..f90cd3a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/cluster/nf-cluster-table.js
@@ -630,34 +630,25 @@
         // only allow the admin to modify the cluster
         if (nfCommon.canModifyController()) {
             var actionFormatter = function (row, cell, value, columnDef, 
dataContext) {
-                var canDisconnect = false;
-                var canConnect = false;
-                var isOffloaded = false;
-
-                // determine the current status
+                var connectDiv = '<div title="Connect" class="pointer 
prompt-for-connect fa fa-plug"></div>';
+                var deleteDiv = '<div title="Delete" class="pointer 
prompt-for-removal fa fa-trash"></div>';
+                var disconnectDiv = '<div title="Disconnect" class="pointer 
prompt-for-disconnect fa fa-power-off"></div>';
+                var offloadDiv = '<div title="Offload" class="pointer 
prompt-for-offload fa fa-rotate-90 fa-upload" ' +
+                    'style="margin-top: 5px;margin-left: 5px;margin-right: 
-2px;"></div>';
+                var markup = '';
+
+                // determine the current status and create the appropriate 
markup
                 if (dataContext.status === 'CONNECTED' || dataContext.status 
=== 'CONNECTING') {
-                    canDisconnect = true;
-                }
-                if (dataContext.status === 'DISCONNECTED') {
-                    canConnect = true;
-                }
-                if (dataContext.status === 'OFFLOADED') {
-                    isOffloaded = true;
-                }
-
-                // return the appropriate markup
-                if (canConnect) {
-                    return '<div title="Connect" class="pointer 
prompt-for-connect fa fa-plug"></div>' +
-                        '<div title="Delete" class="pointer prompt-for-removal 
fa fa-trash"></div>' +
-                        '<div title="Offload" class="pointer 
prompt-for-offload fa fa-rotate-90 fa-upload"></div>';
-                } else if (canDisconnect) {
-                    return '<div title="Disconnect" class="pointer 
prompt-for-disconnect fa fa-power-off"></div>';
-                } else if (isOffloaded) {
-                    return '<div title="Connect" class="pointer 
prompt-for-connect fa fa-plug"></div>' +
-                        '<div title="Delete" class="pointer prompt-for-removal 
fa fa-trash"></div>';
+                    markup += disconnectDiv;
+                } else if (dataContext.status === 'DISCONNECTED') {
+                    markup += connectDiv + offloadDiv + deleteDiv;
+                } else if (dataContext.status === 'OFFLOADED') {
+                    markup += connectDiv + deleteDiv;
                 } else {
-                    return '<div style="width: 16px; height: 
16px;">&nbsp;</div>';
+                    markup += '<div style="width: 16px; height: 
16px;">&nbsp;</div>';
                 }
+
+                return markup;
             };
 
             columnModel.push({

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java
----------------------------------------------------------------------
diff --git 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java
 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java
index 98fa03a..65a7e72 100644
--- 
a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java
+++ 
b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/command/nifi/nodes/DisconnectNode.java
@@ -57,7 +57,7 @@ public class DisconnectNode extends 
AbstractNiFiCommand<NodeResult> {
 
         NodeDTO nodeDto = new NodeDTO();
         nodeDto.setNodeId(nodeId);
-        // TODO There's no constant for node status in
+        // TODO There are no constants for the DISCONNECT node status
         nodeDto.setStatus("DISCONNECTING");
         NodeEntity nodeEntity = new NodeEntity();
         nodeEntity.setNode(nodeDto);

http://git-wip-us.apache.org/repos/asf/nifi/blob/01e2098d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7d433b4..b86cada 100644
--- a/pom.xml
+++ b/pom.xml
@@ -342,6 +342,9 @@
                             <include>**/Test*.class</include>
                             <include>**/*Spec.class</include>
                         </includes>
+                        <excludes>
+                            <exclude>**/*ITSpec.class</exclude>
+                        </excludes>
                         
<redirectTestOutputToFile>true</redirectTestOutputToFile>
                         <argLine combine.children="append">-Xmx1G
                             -Djava.net.preferIPv4Stack=true

Reply via email to