NIFI-5585 Added capability to offload a node that is disconnected from the 
cluster.
Updated NodeClusterCoordinator to allow idempotent requests to offload a cluster
Added capability to connect/delete/disconnect/offload a node from the cluster 
to the Toolkit CLI
Added capability to get the status of nodes from the cluster to the Toolkit CLI
Upgraded FontAwesome to 4.7.0 (from 4.6.1)
Added icon "fa-upload" for offloading nodes in the cluster table UI


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/04d8da8f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/04d8da8f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/04d8da8f

Branch: refs/heads/master
Commit: 04d8da8f46c26bd829c4411ea92692589d93278a
Parents: 83ca676
Author: Jeff Storck <[email protected]>
Authored: Tue Sep 18 17:09:13 2018 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Oct 11 09:23:00 2018 -0400

----------------------------------------------------------------------
 .../nifi/controller/queue/FlowFileQueue.java    |  2 +
 .../coordination/ClusterCoordinator.java        | 18 ++++
 .../ClusterTopologyEventListener.java           |  2 +
 .../coordination/node/NodeConnectionState.java  | 10 +++
 .../coordination/node/NodeConnectionStatus.java | 45 ++++++----
 .../cluster/coordination/node/OffloadCode.java  | 40 +++++++++
 .../ClusterCoordinationProtocolSender.java      |  9 ++
 ...usterCoordinationProtocolSenderListener.java |  6 ++
 .../protocol/impl/SocketProtocolListener.java   |  3 +
 ...andardClusterCoordinationProtocolSender.java | 26 ++++++
 .../message/AdaptedNodeConnectionStatus.java    | 20 +++--
 .../message/NodeConnectionStatusAdapter.java    |  6 +-
 .../protocol/jaxb/message/ObjectFactory.java    |  5 ++
 .../protocol/message/OffloadMessage.java        | 53 +++++++++++
 .../protocol/message/ProtocolMessage.java       |  1 +
 .../heartbeat/AbstractHeartbeatMonitor.java     |  8 +-
 .../ThreadPoolRequestReplicator.java            | 18 ++++
 .../node/NodeClusterCoordinator.java            | 94 ++++++++++++++++++--
 .../exception/IllegalNodeOffloadException.java  | 38 ++++++++
 .../OffloadedNodeMutableRequestException.java   | 39 ++++++++
 .../heartbeat/TestAbstractHeartbeatMonitor.java | 11 +++
 .../node/TestNodeClusterCoordinator.java        |  4 +-
 .../nifi/controller/StandardFlowService.java    | 71 +++++++++++++--
 .../controller/queue/StandardFlowFileQueue.java |  4 +
 .../SocketLoadBalancedFlowFileQueue.java        | 34 +++++++
 .../partition/NonLocalPartitionPartitioner.java | 58 ++++++++++++
 .../TestWriteAheadFlowFileRepository.java       |  4 +
 .../nifi/web/StandardNiFiServiceFacade.java     |  6 +-
 .../IllegalNodeOffloadExceptionMapper.java      | 46 ++++++++++
 .../nifi-web-ui/src/main/frontend/package.json  |  2 +-
 .../webapp/js/nf/cluster/nf-cluster-table.js    | 60 ++++++++++++-
 .../cli/impl/client/nifi/ControllerClient.java  | 14 +++
 .../nifi/impl/JerseyControllerClient.java       | 87 ++++++++++++++++++
 .../toolkit/cli/impl/command/CommandOption.java |  3 +
 .../cli/impl/command/nifi/NiFiCommandGroup.java | 12 +++
 .../impl/command/nifi/nodes/ConnectNode.java    | 67 ++++++++++++++
 .../cli/impl/command/nifi/nodes/DeleteNode.java | 58 ++++++++++++
 .../impl/command/nifi/nodes/DisconnectNode.java | 67 ++++++++++++++
 .../cli/impl/command/nifi/nodes/GetNode.java    | 59 ++++++++++++
 .../cli/impl/command/nifi/nodes/GetNodes.java   | 52 +++++++++++
 .../impl/command/nifi/nodes/OffloadNode.java    | 67 ++++++++++++++
 .../toolkit/cli/impl/result/NodeResult.java     | 48 ++++++++++
 .../toolkit/cli/impl/result/NodesResult.java    | 66 ++++++++++++++
 43 files changed, 1299 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
index 2c7f55b..7cd0e30 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/queue/FlowFileQueue.java
@@ -267,6 +267,8 @@ public interface FlowFileQueue {
 
     void setLoadBalanceStrategy(LoadBalanceStrategy strategy, String 
partitioningAttribute);
 
+    void offloadQueue();
+
     LoadBalanceStrategy getLoadBalanceStrategy();
 
     void setLoadBalanceCompression(LoadBalanceCompression compression);

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
index 11786c2..2ad0e70 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterCoordinator.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.cluster.coordination;
 
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -62,6 +63,23 @@ public interface ClusterCoordinator {
     void finishNodeConnection(NodeIdentifier nodeId);
 
     /**
+     * Indicates that the node has finished being offloaded
+     *
+     * @param nodeId the identifier of the node
+     */
+    void finishNodeOffload(NodeIdentifier nodeId);
+
+    /**
+     * Sends a request to the node to be offloaded.
+     * The node will be marked as offloading immediately.
+     *
+     * @param nodeId the identifier of the node
+     * @param offloadCode the code that represents why this node is being 
asked to be offloaded
+     * @param explanation an explanation as to why the node is being asked to 
be offloaded
+     */
+    void requestNodeOffload(NodeIdentifier nodeId, OffloadCode offloadCode, 
String explanation);
+
+    /**
      * Sends a request to the node to disconnect from the cluster.
      * The node will be marked as disconnected immediately.
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java
index 54cc4de..ad9be3d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/ClusterTopologyEventListener.java
@@ -23,6 +23,8 @@ public interface ClusterTopologyEventListener {
 
     void onNodeAdded(NodeIdentifier nodeId);
 
+    void onNodeOffloaded(NodeIdentifier nodeId);
+
     void onNodeRemoved(NodeIdentifier nodeId);
 
     void onLocalNodeIdentifierSet(NodeIdentifier localNodeId);

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java
index 8d5824f..d79552c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionState.java
@@ -37,12 +37,22 @@ public enum NodeConnectionState {
     CONNECTED,
 
     /**
+     * A node that is in the process of offloading its flow files from the 
node.
+     */
+    OFFLOADING,
+
+    /**
      * A node that is in the process of disconnecting from the cluster.
      * A DISCONNECTING node will always transition to DISCONNECTED.
      */
     DISCONNECTING,
 
     /**
+     * A node that has offloaded its flow files from the node.
+     */
+    OFFLOADED,
+
+    /**
      * A node that is not connected to the cluster.
      * A DISCONNECTED node can transition to CONNECTING.
      */

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
index 34bd127..7d8a940 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/NodeConnectionStatus.java
@@ -35,47 +35,53 @@ public class NodeConnectionStatus {
     private final long updateId;
     private final NodeIdentifier nodeId;
     private final NodeConnectionState state;
+    private final OffloadCode offloadCode;
     private final DisconnectionCode disconnectCode;
-    private final String disconnectReason;
+    private final String reason;
     private final Long connectionRequestTime;
 
 
     public NodeConnectionStatus(final NodeIdentifier nodeId, final 
NodeConnectionState state) {
-        this(nodeId, state, null, null, null);
+        this(nodeId, state, null, null, null, null);
     }
 
     public NodeConnectionStatus(final NodeIdentifier nodeId, final 
DisconnectionCode disconnectionCode) {
-        this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, 
disconnectionCode.toString(), null);
+        this(nodeId, NodeConnectionState.DISCONNECTED, null, 
disconnectionCode, disconnectionCode.toString(), null);
+    }
+
+    public NodeConnectionStatus(final NodeIdentifier nodeId, final 
NodeConnectionState state, final OffloadCode offloadCode, final String 
offloadExplanation) {
+        this(nodeId, state, offloadCode, null, offloadExplanation, null);
     }
 
     public NodeConnectionStatus(final NodeIdentifier nodeId, final 
DisconnectionCode disconnectionCode, final String disconnectionExplanation) {
-        this(nodeId, NodeConnectionState.DISCONNECTED, disconnectionCode, 
disconnectionExplanation, null);
+        this(nodeId, NodeConnectionState.DISCONNECTED, null, 
disconnectionCode, disconnectionExplanation, null);
     }
 
     public NodeConnectionStatus(final NodeIdentifier nodeId, final 
NodeConnectionState state, final DisconnectionCode disconnectionCode) {
-        this(nodeId, state, disconnectionCode, disconnectionCode == null ? 
null : disconnectionCode.toString(), null);
+        this(nodeId, state, null, disconnectionCode, disconnectionCode == null 
? null : disconnectionCode.toString(), null);
     }
 
     public NodeConnectionStatus(final NodeConnectionStatus status) {
-        this(status.getNodeIdentifier(), status.getState(), 
status.getDisconnectCode(), status.getDisconnectReason(), 
status.getConnectionRequestTime());
+        this(status.getNodeIdentifier(), status.getState(), 
status.getOffloadCode(), status.getDisconnectCode(), status.getReason(), 
status.getConnectionRequestTime());
     }
 
-    public NodeConnectionStatus(final NodeIdentifier nodeId, final 
NodeConnectionState state, final DisconnectionCode disconnectCode,
-        final String disconnectReason, final Long connectionRequestTime) {
-        this(idGenerator.getAndIncrement(), nodeId, state, disconnectCode, 
disconnectReason, connectionRequestTime);
+    public NodeConnectionStatus(final NodeIdentifier nodeId, final 
NodeConnectionState state, final OffloadCode offloadCode,
+                                final DisconnectionCode disconnectCode, final 
String reason, final Long connectionRequestTime) {
+        this(idGenerator.getAndIncrement(), nodeId, state, offloadCode, 
disconnectCode, reason, connectionRequestTime);
     }
 
-    public NodeConnectionStatus(final long updateId, final NodeIdentifier 
nodeId, final NodeConnectionState state, final DisconnectionCode disconnectCode,
-        final String disconnectReason, final Long connectionRequestTime) {
+    public NodeConnectionStatus(final long updateId, final NodeIdentifier 
nodeId, final NodeConnectionState state, final OffloadCode offloadCode,
+                                final DisconnectionCode disconnectCode, final 
String reason, final Long connectionRequestTime) {
         this.updateId = updateId;
         this.nodeId = nodeId;
         this.state = state;
+        this.offloadCode = offloadCode;
         if (state == NodeConnectionState.DISCONNECTED && disconnectCode == 
null) {
             this.disconnectCode = DisconnectionCode.UNKNOWN;
-            this.disconnectReason = this.disconnectCode.toString();
+            this.reason = this.disconnectCode.toString();
         } else {
             this.disconnectCode = disconnectCode;
-            this.disconnectReason = disconnectReason;
+            this.reason = reason;
         }
 
         this.connectionRequestTime = (connectionRequestTime == null && state 
== NodeConnectionState.CONNECTING) ? Long.valueOf(System.currentTimeMillis()) : 
connectionRequestTime;
@@ -93,12 +99,16 @@ public class NodeConnectionStatus {
         return state;
     }
 
+    public OffloadCode getOffloadCode() {
+        return offloadCode;
+    }
+
     public DisconnectionCode getDisconnectCode() {
         return disconnectCode;
     }
 
-    public String getDisconnectReason() {
-        return disconnectReason;
+    public String getReason() {
+        return reason;
     }
 
     public Long getConnectionRequestTime() {
@@ -110,8 +120,11 @@ public class NodeConnectionStatus {
         final StringBuilder sb = new StringBuilder();
         final NodeConnectionState state = getState();
         sb.append("NodeConnectionStatus[nodeId=").append(nodeId).append(", 
state=").append(state);
+        if (state == NodeConnectionState.OFFLOADED || state == 
NodeConnectionState.OFFLOADING) {
+            sb.append(", Offload Code=").append(getOffloadCode()).append(", 
Offload Reason=").append(getReason());
+        }
         if (state == NodeConnectionState.DISCONNECTED || state == 
NodeConnectionState.DISCONNECTING) {
-            sb.append(", Disconnect 
Code=").append(getDisconnectCode()).append(", Disconnect 
Reason=").append(getDisconnectReason());
+            sb.append(", Disconnect 
Code=").append(getDisconnectCode()).append(", Disconnect 
Reason=").append(getReason());
         }
         sb.append(", updateId=").append(getUpdateIdentifier());
         sb.append("]");

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/OffloadCode.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/OffloadCode.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/OffloadCode.java
new file mode 100644
index 0000000..fb4d30b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/coordination/node/OffloadCode.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.coordination.node;
+
+/**
+ * An enumeration of the reasons that a node may be offloaded
+ */
+public enum OffloadCode {
+
+    /**
+     * A user explicitly offloaded the node
+     */
+    OFFLOADED("Node Offloaded");
+
+    private final String description;
+
+    OffloadCode(final String description) {
+        this.description = description;
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
index 986231e..b5485cc 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/ClusterCoordinationProtocolSender.java
@@ -19,6 +19,7 @@ package org.apache.nifi.cluster.protocol;
 import java.util.Set;
 
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import org.apache.nifi.cluster.protocol.message.OffloadMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -41,6 +42,14 @@ public interface ClusterCoordinationProtocolSender {
     ReconnectionResponseMessage requestReconnection(ReconnectionRequestMessage 
msg) throws ProtocolException;
 
     /**
+     * Sends an "offload request" message to a node.
+     *
+     * @param msg a message
+     * @throws ProtocolException if communication failed
+     */
+    void offload(OffloadMessage msg) throws ProtocolException;
+
+    /**
      * Sends a "disconnection request" message to a node.
      *
      * @param msg a message

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java
index ae3a0e5..74cc6b4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/ClusterCoordinationProtocolSenderListener.java
@@ -26,6 +26,7 @@ import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.ProtocolListener;
+import org.apache.nifi.cluster.protocol.message.OffloadMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
@@ -101,6 +102,11 @@ public class ClusterCoordinationProtocolSenderListener 
implements ClusterCoordin
     }
 
     @Override
+    public void offload(OffloadMessage msg) throws ProtocolException {
+        sender.offload(msg);
+    }
+
+    @Override
     public void disconnect(DisconnectMessage msg) throws ProtocolException {
         sender.disconnect(msg);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
index 9eaffd3..c588a68 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java
@@ -24,6 +24,7 @@ import org.apache.nifi.cluster.protocol.ProtocolListener;
 import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
 import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.OffloadMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@@ -210,6 +211,8 @@ public class SocketProtocolListener extends SocketListener 
implements ProtocolLi
                 return ((ConnectionRequestMessage) 
message).getConnectionRequest().getProposedNodeIdentifier();
             case HEARTBEAT:
                 return ((HeartbeatMessage) 
message).getHeartbeat().getNodeIdentifier();
+            case OFFLOAD_REQUEST:
+                return ((OffloadMessage) message).getNodeId();
             case DISCONNECTION_REQUEST:
                 return ((DisconnectMessage) message).getNodeId();
             case FLOW_REQUEST:

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
index 167ddec..b21068f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/StandardClusterCoordinationProtocolSender.java
@@ -36,6 +36,7 @@ import org.apache.nifi.cluster.protocol.ProtocolContext;
 import org.apache.nifi.cluster.protocol.ProtocolException;
 import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
 import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.cluster.protocol.message.OffloadMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusRequestMessage;
 import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
@@ -129,6 +130,31 @@ public class StandardClusterCoordinationProtocolSender 
implements ClusterCoordin
     }
 
     /**
+     * Requests a node to be offloaded. The configured value for
+     * handshake timeout is applied to the socket before making the request.
+     *
+     * @param msg a message
+     * @throws ProtocolException if the message failed to be sent
+     */
+    @Override
+    public void offload(final OffloadMessage msg) throws ProtocolException {
+        Socket socket = null;
+        try {
+            socket = createSocket(msg.getNodeId(), true);
+
+            // marshal message to output stream
+            try {
+                final ProtocolMessageMarshaller<ProtocolMessage> marshaller = 
protocolContext.createMarshaller();
+                marshaller.marshal(msg, socket.getOutputStream());
+            } catch (final IOException ioe) {
+                throw new ProtocolException("Failed marshalling '" + 
msg.getType() + "' protocol message due to: " + ioe, ioe);
+            }
+        } finally {
+            SocketUtils.closeQuietly(socket);
+        }
+    }
+
+    /**
      * Requests a node to disconnect from the cluster. The configured value for
      * handshake timeout is applied to the socket before making the request.
      *

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
index c8c4acf..5eae83e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/AdaptedNodeConnectionStatus.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.cluster.protocol.jaxb.message;
 
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
@@ -25,8 +26,9 @@ public class AdaptedNodeConnectionStatus {
     private Long updateId;
     private NodeIdentifier nodeId;
     private NodeConnectionState state;
+    private OffloadCode offloadCode;
     private DisconnectionCode disconnectCode;
-    private String disconnectReason;
+    private String reason;
     private Long connectionRequestTime;
 
     public Long getUpdateId() {
@@ -53,20 +55,28 @@ public class AdaptedNodeConnectionStatus {
         this.state = state;
     }
 
+    public OffloadCode getOffloadCode() {
+        return offloadCode;
+    }
+
     public DisconnectionCode getDisconnectCode() {
         return disconnectCode;
     }
 
+    public void setOffloadCode(OffloadCode offloadCode) {
+        this.offloadCode = offloadCode;
+    }
+
     public void setDisconnectCode(DisconnectionCode disconnectCode) {
         this.disconnectCode = disconnectCode;
     }
 
-    public String getDisconnectReason() {
-        return disconnectReason;
+    public String getReason() {
+        return reason;
     }
 
-    public void setDisconnectReason(String disconnectReason) {
-        this.disconnectReason = disconnectReason;
+    public void setReason(String reason) {
+        this.reason = reason;
     }
 
     public Long getConnectionRequestTime() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
index ec209de..47e92e8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/NodeConnectionStatusAdapter.java
@@ -28,8 +28,9 @@ public class NodeConnectionStatusAdapter extends 
XmlAdapter<AdaptedNodeConnectio
         return new NodeConnectionStatus(adapted.getUpdateId(),
             adapted.getNodeId(),
             adapted.getState(),
+            adapted.getOffloadCode(),
             adapted.getDisconnectCode(),
-            adapted.getDisconnectReason(),
+            adapted.getReason(),
             adapted.getConnectionRequestTime());
     }
 
@@ -40,8 +41,9 @@ public class NodeConnectionStatusAdapter extends 
XmlAdapter<AdaptedNodeConnectio
             adapted.setUpdateId(toAdapt.getUpdateIdentifier());
             adapted.setNodeId(toAdapt.getNodeIdentifier());
             
adapted.setConnectionRequestTime(toAdapt.getConnectionRequestTime());
+            adapted.setOffloadCode(toAdapt.getOffloadCode());
             adapted.setDisconnectCode(toAdapt.getDisconnectCode());
-            adapted.setDisconnectReason(toAdapt.getDisconnectReason());
+            adapted.setReason(toAdapt.getReason());
             adapted.setState(toAdapt.getState());
         }
         return adapted;

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
index 9a594a4..2f02e5e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/message/ObjectFactory.java
@@ -20,6 +20,7 @@ import javax.xml.bind.annotation.XmlRegistry;
 
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.OffloadMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
@@ -52,6 +53,10 @@ public class ObjectFactory {
         return new ReconnectionResponseMessage();
     }
 
+    public OffloadMessage createDecomissionMessage() {
+        return new OffloadMessage();
+    }
+
     public DisconnectMessage createDisconnectionMessage() {
         return new DisconnectMessage();
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/OffloadMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/OffloadMessage.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/OffloadMessage.java
new file mode 100644
index 0000000..a7acd56
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/OffloadMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.protocol.message;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
+
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
+@XmlRootElement(name = "offloadMessage")
+public class OffloadMessage extends ProtocolMessage {
+
+    private NodeIdentifier nodeId;
+    private String explanation;
+
+    @XmlJavaTypeAdapter(NodeIdentifierAdapter.class)
+    public NodeIdentifier getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(NodeIdentifier nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    public String getExplanation() {
+        return explanation;
+    }
+
+    public void setExplanation(String explanation) {
+        this.explanation = explanation;
+    }
+
+    @Override
+    public MessageType getType() {
+        return MessageType.OFFLOAD_REQUEST;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
index 482f5d6..fe26c7a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/ProtocolMessage.java
@@ -21,6 +21,7 @@ public abstract class ProtocolMessage {
     public static enum MessageType {
         CONNECTION_REQUEST,
         CONNECTION_RESPONSE,
+        OFFLOAD_REQUEST,
         DISCONNECTION_REQUEST,
         EXCEPTION,
         FLOW_REQUEST,

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
index 4395883..f6d09ab 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.java
@@ -228,6 +228,12 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
             return;
         }
 
+        if (NodeConnectionState.OFFLOADED == connectionState) {
+            // Cluster Coordinator believes that node is offloaded, but let 
the node reconnect
+            clusterCoordinator.reportEvent(nodeId, Severity.INFO, "Received 
heartbeat from node that is offloaded. " +
+                    "Marking as Disconnected and requesting that Node 
reconnect to cluster");
+            clusterCoordinator.requestNodeConnect(nodeId, null);
+        }
         if (NodeConnectionState.DISCONNECTED == connectionState) {
             // ignore heartbeats from nodes disconnected by means other than 
lack of heartbeat, unless it is
             // the only node. We allow it if it is the only node because if we 
have a one-node cluster, then
@@ -249,7 +255,7 @@ public abstract class AbstractHeartbeatMonitor implements 
HeartbeatMonitor {
                 default: {
                     // disconnected nodes should not heartbeat, so we need to 
issue a disconnection request.
                     logger.info("Ignoring received heartbeat from disconnected 
node " + nodeId + ".  Issuing disconnection request.");
-                    clusterCoordinator.requestNodeDisconnect(nodeId, 
disconnectionCode, connectionStatus.getDisconnectReason());
+                    clusterCoordinator.requestNodeDisconnect(nodeId, 
disconnectionCode, connectionStatus.getReason());
                     removeHeartbeat(nodeId);
                     break;
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
index 85618de..0b2f1fe 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/replication/ThreadPoolRequestReplicator.java
@@ -170,6 +170,24 @@ public class ThreadPoolRequestReplicator implements 
RequestReplicator {
 
         // If the request is mutable, ensure that all nodes are connected.
         if (mutable) {
+            final List<NodeIdentifier> offloaded = 
stateMap.get(NodeConnectionState.OFFLOADED);
+            if (offloaded != null && !offloaded.isEmpty()) {
+                if (offloaded.size() == 1) {
+                    throw new OffloadedNodeMutableRequestException("Node " + 
offloaded.iterator().next() + " is currently offloaded");
+                } else {
+                    throw new 
OffloadedNodeMutableRequestException(offloaded.size() + " Nodes are currently 
offloaded");
+                }
+            }
+
+            final List<NodeIdentifier> offloading = 
stateMap.get(NodeConnectionState.OFFLOADING);
+            if (offloading != null && !offloading.isEmpty()) {
+                if (offloading.size() == 1) {
+                    throw new OffloadedNodeMutableRequestException("Node " + 
offloading.iterator().next() + " is currently offloading");
+                } else {
+                    throw new 
OffloadedNodeMutableRequestException(offloading.size() + " Nodes are currently 
offloading");
+                }
+            }
+
             final List<NodeIdentifier> disconnected = 
stateMap.get(NodeConnectionState.DISCONNECTED);
             if (disconnected != null && !disconnected.isEmpty()) {
                 if (disconnected.size() == 1) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
index b24475e..e165041 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/node/NodeClusterCoordinator.java
@@ -34,6 +34,7 @@ import org.apache.nifi.cluster.event.NodeEvent;
 import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
 import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
 import org.apache.nifi.cluster.manager.NodeResponse;
+import org.apache.nifi.cluster.manager.exception.IllegalNodeOffloadException;
 import 
org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
 import org.apache.nifi.cluster.protocol.ComponentRevision;
 import org.apache.nifi.cluster.protocol.ConnectionRequest;
@@ -49,6 +50,7 @@ import 
org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
+import org.apache.nifi.cluster.protocol.message.OffloadMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import 
org.apache.nifi.cluster.protocol.message.NodeConnectionStatusResponseMessage;
 import org.apache.nifi.cluster.protocol.message.NodeStatusChangeMessage;
@@ -431,7 +433,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             reportEvent(nodeId, Severity.INFO, "Requesting that node connect 
to cluster on behalf of " + userDn);
         }
 
-        updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis()));
+        updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.CONNECTING, null, null, null, System.currentTimeMillis()));
 
         // create the request
         final ReconnectionRequestMessage request = new 
ReconnectionRequestMessage();
@@ -470,6 +472,50 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
     }
 
     @Override
+    public void finishNodeOffload(final NodeIdentifier nodeId) {
+        final NodeConnectionState state = getConnectionState(nodeId);
+        if (state == null) {
+            logger.warn("Attempted to finish node offload for {} but node is 
not known.", nodeId);
+            return;
+        }
+
+        if (state != NodeConnectionState.OFFLOADING) {
+            logger.warn("Attempted to finish node offload for {} but node is 
not in a offload state, it is currently {}.", nodeId, state);
+            return;
+        }
+
+        logger.info("{} is now offloaded", nodeId);
+        updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.OFFLOADED));
+    }
+
+    @Override
+    public void requestNodeOffload(final NodeIdentifier nodeId, final 
OffloadCode offloadCode, final String explanation) {
+        final Set<NodeIdentifier> offloadNodeIds = 
getNodeIdentifiers(NodeConnectionState.OFFLOADING, 
NodeConnectionState.OFFLOADED);
+        if (offloadNodeIds.contains(nodeId)) {
+            logger.debug("Attempted to offload node but the node is already 
offloading or offloaded");
+            // no need to do anything here, the node is currently offloading 
or already offloaded
+            return;
+        }
+
+        final Set<NodeIdentifier> disconnectedNodeIds = 
getNodeIdentifiers(NodeConnectionState.DISCONNECTED);
+        if (!disconnectedNodeIds.contains(nodeId)) {
+            throw new IllegalNodeOffloadException("Cannot offload node " + 
nodeId + " because it is not currently disconnected");
+        }
+
+        logger.info("Requesting that {} is offloaded due to {}", nodeId, 
explanation == null ? offloadCode : explanation);
+
+        updateNodeStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.OFFLOADING, offloadCode, explanation));
+
+        final OffloadMessage request = new OffloadMessage();
+        request.setNodeId(nodeId);
+        request.setExplanation(explanation);
+
+        addNodeEvent(nodeId, "Offload requested due to " + explanation);
+        onNodeOffloaded(nodeId);
+        offloadAsynchronously(request, 10, 5);
+    }
+
+    @Override
     public void requestNodeDisconnect(final NodeIdentifier nodeId, final 
DisconnectionCode disconnectionCode, final String explanation) {
         final Set<NodeIdentifier> connectedNodeIds = 
getNodeIdentifiers(NodeConnectionState.CONNECTED);
         if (connectedNodeIds.size() == 1 && connectedNodeIds.contains(nodeId)) 
{
@@ -526,17 +572,19 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         storeState();
     }
 
+    private void onNodeOffloaded(final NodeIdentifier nodeId) {
+        eventListeners.forEach(listener -> listener.onNodeOffloaded(nodeId));
+    }
+
     private void onNodeRemoved(final NodeIdentifier nodeId) {
-        eventListeners.stream().forEach(listener -> 
listener.onNodeRemoved(nodeId));
+        eventListeners.forEach(listener -> listener.onNodeRemoved(nodeId));
     }
 
     private void onNodeAdded(final NodeIdentifier nodeId, final boolean 
storeState) {
         if (storeState) {
             storeState();
         }
-
-
-        eventListeners.stream().forEach(listener -> 
listener.onNodeAdded(nodeId));
+        eventListeners.forEach(listener -> listener.onNodeAdded(nodeId));
     }
 
     @Override
@@ -821,7 +869,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         // Otherwise, get the active coordinator (or wait for one to become 
active) and then notify the coordinator.
         final Set<NodeIdentifier> nodesToNotify;
         if (notifyAllNodes) {
-            nodesToNotify = getNodeIdentifiers(NodeConnectionState.CONNECTED, 
NodeConnectionState.CONNECTING);
+            nodesToNotify = getNodeIdentifiers();
 
             // Do not notify ourselves because we already know about the 
status update.
             nodesToNotify.remove(getLocalNodeIdentifier());
@@ -841,6 +889,34 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
         senderListener.notifyNodeStatusChange(nodesToNotify, message);
     }
 
+    private void offloadAsynchronously(final OffloadMessage request, final int 
attempts, final int retrySeconds) {
+        final Thread offloadThread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                final NodeIdentifier nodeId = request.getNodeId();
+
+                for (int i = 0; i < attempts; i++) {
+                    try {
+                        senderListener.offload(request);
+                        reportEvent(nodeId, Severity.INFO, "Node was offloaded 
due to " + request.getExplanation());
+                        return;
+                    } catch (final Exception e) {
+                        logger.error("Failed to notify {} that it has been 
offloaded due to {}", request.getNodeId(), request.getExplanation(), e);
+
+                        try {
+                            Thread.sleep(retrySeconds * 1000L);
+                        } catch (final InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                            return;
+                        }
+                    }
+                }
+            }
+        }, "Offload " + request.getNodeId());
+
+        offloadThread.start();
+    }
+
     private void disconnectAsynchronously(final DisconnectMessage request, 
final int attempts, final int retrySeconds) {
         final Thread disconnectThread = new Thread(new Runnable() {
             @Override
@@ -961,8 +1037,8 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
 
         if (oldStatus == null || status.getState() != oldStatus.getState()) {
             sb.append("Node Status changed from ").append(oldStatus == null ? 
"[Unknown Node]" : oldStatus.getState().toString()).append(" to 
").append(status.getState().toString());
-            if (status.getDisconnectReason() != null) {
-                sb.append(" due to ").append(status.getDisconnectReason());
+            if (status.getReason() != null) {
+                sb.append(" due to ").append(status.getReason());
             } else if (status.getDisconnectCode() != null) {
                 sb.append(" due to 
").append(status.getDisconnectCode().toString());
             }
@@ -1118,7 +1194,7 @@ public class NodeClusterCoordinator implements 
ClusterCoordinator, ProtocolHandl
             addNodeEvent(resolvedNodeIdentifier, "Connection requested from 
existing node. Setting status to connecting.");
         }
 
-        status = new NodeConnectionStatus(resolvedNodeIdentifier, 
NodeConnectionState.CONNECTING, null, null, System.currentTimeMillis());
+        status = new NodeConnectionStatus(resolvedNodeIdentifier, 
NodeConnectionState.CONNECTING, null, null, null, System.currentTimeMillis());
         updateNodeStatus(status);
 
         final ConnectionResponse response = new 
ConnectionResponse(resolvedNodeIdentifier, clusterDataFlow, instanceId, 
getConnectionStatuses(),

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeOffloadException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeOffloadException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeOffloadException.java
new file mode 100644
index 0000000..f1bc669
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/IllegalNodeOffloadException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when an offload request is issued to a node 
that cannot be offloaded (e.g., not currently disconnected).
+ */
+public class IllegalNodeOffloadException extends IllegalClusterStateException {
+
+    public IllegalNodeOffloadException() {
+    }
+
+    public IllegalNodeOffloadException(String msg) {
+        super(msg);
+    }
+
+    public IllegalNodeOffloadException(Throwable cause) {
+        super(cause);
+    }
+
+    public IllegalNodeOffloadException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/OffloadedNodeMutableRequestException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/OffloadedNodeMutableRequestException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/OffloadedNodeMutableRequestException.java
new file mode 100644
index 0000000..3663349
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/exception/OffloadedNodeMutableRequestException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.cluster.manager.exception;
+
+/**
+ * Represents the exceptional case when a HTTP request that may change a 
node's dataflow is to be replicated while one or more nodes are offloaded.
+ *
+ */
+public class OffloadedNodeMutableRequestException extends 
MutableRequestException {
+
+    public OffloadedNodeMutableRequestException() {
+    }
+
+    public OffloadedNodeMutableRequestException(String msg) {
+        super(msg);
+    }
+
+    public OffloadedNodeMutableRequestException(Throwable cause) {
+        super(cause);
+    }
+
+    public OffloadedNodeMutableRequestException(String msg, Throwable cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
index 6ea019d..4aeff7b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/heartbeat/TestAbstractHeartbeatMonitor.java
@@ -20,6 +20,7 @@ package org.apache.nifi.cluster.coordination.heartbeat;
 import org.apache.nifi.cluster.ReportedEvent;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -245,6 +246,16 @@ public class TestAbstractHeartbeatMonitor {
         }
 
         @Override
+        public synchronized void finishNodeOffload(NodeIdentifier nodeId) {
+            statuses.put(nodeId, new NodeConnectionStatus(nodeId, 
NodeConnectionState.OFFLOADED));
+        }
+
+        @Override
+        public synchronized void requestNodeOffload(NodeIdentifier nodeId, 
OffloadCode offloadCode, String explanation) {
+            statuses.put(nodeId, new NodeConnectionStatus(nodeId, 
NodeConnectionState.OFFLOADED));
+        }
+
+        @Override
         public synchronized void requestNodeDisconnect(NodeIdentifier nodeId, 
DisconnectionCode disconnectionCode, String explanation) {
             statuses.put(nodeId, new NodeConnectionStatus(nodeId, 
NodeConnectionState.DISCONNECTED));
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
index fb06a15..5ce2985 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/test/java/org/apache/nifi/cluster/coordination/node/TestNodeClusterCoordinator.java
@@ -280,7 +280,7 @@ public class TestNodeClusterCoordinator {
         assertNotNull(statusChange);
         assertEquals(createNodeId(1), statusChange.getNodeIdentifier());
         assertEquals(DisconnectionCode.NODE_SHUTDOWN, 
statusChange.getDisconnectCode());
-        assertEquals("Unit Test", statusChange.getDisconnectReason());
+        assertEquals("Unit Test", statusChange.getReason());
     }
 
     @Test
@@ -407,7 +407,7 @@ public class TestNodeClusterCoordinator {
         nodeStatuses.clear();
 
         final NodeConnectionStatus oldStatus = new NodeConnectionStatus(-1L, 
nodeId1, NodeConnectionState.DISCONNECTED,
-            DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L);
+            null, DisconnectionCode.BLOCKED_BY_FIREWALL, null, 0L);
         final NodeStatusChangeMessage msg = new NodeStatusChangeMessage();
         msg.setNodeId(nodeId1);
         msg.setNodeConnectionStatus(oldStatus);

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 7a5c45e..297595f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -23,6 +23,7 @@ import org.apache.nifi.authorization.ManagedAuthorizer;
 import org.apache.nifi.bundle.Bundle;
 import org.apache.nifi.cluster.ConnectionException;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -36,6 +37,7 @@ import org.apache.nifi.cluster.protocol.ProtocolHandler;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
+import org.apache.nifi.cluster.protocol.message.OffloadMessage;
 import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
 import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
 import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
@@ -44,12 +46,14 @@ import 
org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateManager;
+import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.serialization.FlowSerializationException;
 import org.apache.nifi.controller.serialization.FlowSynchronizationException;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.lifecycle.LifeCycleStartException;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.nar.NarClassLoaders;
@@ -381,6 +385,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
     public boolean canHandle(final ProtocolMessage msg) {
         switch (msg.getType()) {
             case RECONNECTION_REQUEST:
+            case OFFLOAD_REQUEST:
             case DISCONNECTION_REQUEST:
             case FLOW_REQUEST:
                 return true;
@@ -415,6 +420,22 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
                     return new ReconnectionResponseMessage();
                 }
+                case OFFLOAD_REQUEST: {
+                    final Thread t = new Thread(new Runnable() {
+                        @Override
+                        public void run() {
+                            try {
+                                handleOffloadRequest((OffloadMessage) request);
+                            } catch (InterruptedException e) {
+                                throw new ProtocolException("Could not 
complete offload request", e);
+                            }
+                        }
+                    }, "Offload Flow Files from Node");
+                    t.setDaemon(true);
+                    t.start();
+
+                    return null;
+                }
                 case DISCONNECTION_REQUEST: {
                     final Thread t = new Thread(new Runnable() {
                         @Override
@@ -561,7 +582,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
     private FlowResponseMessage handleFlowRequest(final FlowRequestMessage 
request) throws ProtocolException {
         readLock.lock();
         try {
-            logger.info("Received flow request message from manager.");
+            logger.info("Received flow request message from cluster 
coordinator.");
 
             // create the response
             final FlowResponseMessage response = new FlowResponseMessage();
@@ -631,7 +652,7 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
 
     private void handleReconnectionRequest(final ReconnectionRequestMessage 
request) {
         try {
-            logger.info("Processing reconnection request from manager.");
+            logger.info("Processing reconnection request from cluster 
coordinator.");
 
             // reconnect
             ConnectionResponse connectionResponse = new 
ConnectionResponse(getNodeId(), request.getDataFlow(),
@@ -662,8 +683,48 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
         }
     }
 
+    private void handleOffloadRequest(final OffloadMessage request) throws 
InterruptedException {
+        logger.info("Received offload request message from cluster coordinator 
with explanation: " + request.getExplanation());
+        offload(request.getExplanation());
+    }
+
+    private void offload(final String explanation) throws InterruptedException 
{
+        writeLock.lock();
+        try {
+
+            logger.info("Offloading node due to " + explanation);
+
+            // mark node as offloading
+            controller.setConnectionStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.OFFLOADING, OffloadCode.OFFLOADED, explanation));
+            // request to stop all processors on node
+            controller.stopAllProcessors();
+            // request to stop all remote process groups
+            
controller.getRootGroup().findAllRemoteProcessGroups().forEach(RemoteProcessGroup::stopTransmitting);
+            // terminate all processors
+            controller.getRootGroup().findAllProcessors()
+                    // filter stream, only stopped processors can be terminated
+                    .stream().filter(pn -> pn.getScheduledState() == 
ScheduledState.STOPPED)
+                    .forEach(pn -> 
pn.getProcessGroup().terminateProcessor(pn));
+            // offload all queues on node
+            controller.getAllQueues().forEach(FlowFileQueue::offloadQueue);
+            // wait for rebalance of flowfiles on all queues
+            while (controller.getControllerStatus().getQueuedCount() > 0) {
+                logger.debug("Offloading queues on node {}, remaining queued 
count: {}", getNodeId(), controller.getControllerStatus().getQueuedCount());
+                Thread.sleep(1000);
+            }
+            // finish offload
+            controller.setConnectionStatus(new NodeConnectionStatus(nodeId, 
NodeConnectionState.OFFLOADED, OffloadCode.OFFLOADED, explanation));
+            clusterCoordinator.finishNodeOffload(getNodeId());
+
+            logger.info("Node offloaded due to " + explanation);
+
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
     private void handleDisconnectionRequest(final DisconnectMessage request) {
-        logger.info("Received disconnection request message from manager with 
explanation: " + request.getExplanation());
+        logger.info("Received disconnection request message from cluster 
coordinator with explanation: " + request.getExplanation());
         disconnect(request.getExplanation());
     }
 
@@ -829,11 +890,11 @@ public class StandardFlowService implements FlowService, 
ProtocolHandler {
                         }
                     } else if (response.getRejectionReason() != null) {
                         logger.warn("Connection request was blocked by cluster 
coordinator with the explanation: " + response.getRejectionReason());
-                        // set response to null and treat a firewall blockage 
the same as getting no response from manager
+                        // set response to null and treat a firewall blockage 
the same as getting no response from cluster coordinator
                         response = null;
                         break;
                     } else {
-                        // we received a successful connection response from 
manager
+                        // we received a successful connection response from 
cluster coordinator
                         break;
                     }
                 } catch (final NoClusterCoordinatorException ncce) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
index cab41e8..ee222f4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/StandardFlowFileQueue.java
@@ -75,6 +75,10 @@ public class StandardFlowFileQueue extends 
AbstractFlowFileQueue implements Flow
     }
 
     @Override
+    public void offloadQueue() {
+    }
+
+    @Override
     public boolean isActivelyLoadBalancing() {
         return false;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 193a961..4c9188b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -40,6 +40,7 @@ import 
org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner
 import 
org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
 import 
org.apache.nifi.controller.queue.clustered.partition.LocalPartitionPartitioner;
 import 
org.apache.nifi.controller.queue.clustered.partition.LocalQueuePartition;
+import 
org.apache.nifi.controller.queue.clustered.partition.NonLocalPartitionPartitioner;
 import org.apache.nifi.controller.queue.clustered.partition.QueuePartition;
 import 
org.apache.nifi.controller.queue.clustered.partition.RebalancingPartition;
 import 
org.apache.nifi.controller.queue.clustered.partition.RemoteQueuePartition;
@@ -113,6 +114,7 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
     private QueuePartition[] queuePartitions;
     private FlowFilePartitioner partitioner;
     private boolean stopped = true;
+    private boolean offloaded = false;
 
 
     public SocketLoadBalancedFlowFileQueue(final String identifier, final 
ConnectionEventListener eventListener, final ProcessScheduler scheduler, final 
FlowFileRepository flowFileRepo,
@@ -204,6 +206,19 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
         setFlowFilePartitioner(partitioner);
     }
 
+    @Override
+    public void offloadQueue() {
+        if (clusterCoordinator == null) {
+            // Not clustered, so don't change partitions
+            return;
+        }
+
+        offloaded = true;
+
+        // TODO need to be able to reset the partitioner to the previous 
partitioner if this node is reconnected to the cluster
+        setFlowFilePartitioner(new NonLocalPartitionPartitioner());
+    }
+
     public synchronized void startLoadBalancing() {
         logger.debug("{} started. Will begin distributing FlowFiles across the 
cluster", this);
 
@@ -551,6 +566,11 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
                 return;
             }
 
+            if (offloaded) {
+                logger.debug("{} Not going to rebalance Queue even though 
setNodeIdentifiers was called, because the queue has been offloaded", this);
+                return;
+            }
+
             logger.debug("{} Stopping the {} queue partitions in order to 
change node identifiers from {} to {}", this, queuePartitions.length, 
this.nodeIdentifiers, updatedNodeIdentifiers);
             for (final QueuePartition queuePartition : queuePartitions) {
                 queuePartition.stop();
@@ -969,6 +989,20 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
         }
 
         @Override
+        public void onNodeOffloaded(final NodeIdentifier nodeId) {
+            partitionWriteLock.lock();
+            try {
+                final Set<NodeIdentifier> updatedNodeIds = new 
HashSet<>(nodeIdentifiers);
+                updatedNodeIds.remove(nodeId);
+
+                logger.debug("Node Identifier {} offloaded. Node ID's changing 
from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds);
+                setNodeIdentifiers(updatedNodeIds, false);
+            } finally {
+                partitionWriteLock.unlock();
+            }
+        }
+
+        @Override
         public void onNodeRemoved(final NodeIdentifier nodeId) {
             partitionWriteLock.lock();
             try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java
new file mode 100644
index 0000000..cffaefd
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/NonLocalPartitionPartitioner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Returns remote partitions when queried for a partition; never returns the 
{@link LocalQueuePartition}.
+ */
+public class NonLocalPartitionPartitioner implements FlowFilePartitioner {
+    private final AtomicLong counter = new AtomicLong(0L);
+
+    @Override
+    public QueuePartition getPartition(final FlowFileRecord flowFile, final 
QueuePartition[] partitions, final QueuePartition localPartition) {
+        QueuePartition remotePartition = null;
+        final long startIndex = counter.getAndIncrement();
+        for (int i = 0, numPartitions = partitions.length; i < numPartitions 
&& remotePartition == null; ++i) {
+            int index = (int) ((startIndex + i) % numPartitions);
+            QueuePartition partition = partitions[index];
+            if (!partition.equals(localPartition)) {
+                remotePartition = partition;
+            }
+        }
+
+        if (remotePartition == null) {
+            throw new IllegalStateException("Could not determine a remote 
partition");
+        }
+
+        return remotePartition;
+    }
+
+    @Override
+    public boolean isRebalanceOnClusterResize() {
+        return false;
+    }
+
+
+    @Override
+    public boolean isRebalanceOnFailure() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 402ce06..878ad13 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -110,6 +110,10 @@ public class TestWriteAheadFlowFileRepository {
             }
 
             @Override
+            public void offloadQueue() {
+            }
+
+            @Override
             public boolean isActivelyLoadBalancing() {
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index b8ac88a..4ef241e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -47,6 +47,7 @@ import 
org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
 import org.apache.nifi.cluster.coordination.heartbeat.NodeHeartbeat;
 import org.apache.nifi.cluster.coordination.node.ClusterRoles;
+import org.apache.nifi.cluster.coordination.node.OffloadCode;
 import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -1124,6 +1125,9 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
 
         if 
(NodeConnectionState.CONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
             clusterCoordinator.requestNodeConnect(nodeId, userDn);
+        } else if 
(NodeConnectionState.OFFLOADING.name().equalsIgnoreCase(nodeDTO.getStatus())) {
+            clusterCoordinator.requestNodeOffload(nodeId, 
OffloadCode.OFFLOADED,
+                    "User " + userDn + " requested that node be offloaded");
         } else if 
(NodeConnectionState.DISCONNECTING.name().equalsIgnoreCase(nodeDTO.getStatus()))
 {
             clusterCoordinator.requestNodeDisconnect(nodeId, 
DisconnectionCode.USER_DISCONNECTED,
                     "User " + userDn + " requested that node be disconnected 
from cluster");
@@ -4702,7 +4706,7 @@ public class StandardNiFiServiceFacade implements 
NiFiServiceFacade {
         }
 
         final NodeConnectionStatus nodeConnectionStatus = 
clusterCoordinator.getConnectionStatus(nodeIdentifier);
-        if 
(!nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) {
+        if 
(!nodeConnectionStatus.getState().equals(NodeConnectionState.OFFLOADED) && 
!nodeConnectionStatus.getState().equals(NodeConnectionState.DISCONNECTED)) {
             throw new IllegalNodeDeletionException("Cannot remove Node with ID 
" + nodeId + " because it is not disconnected, current state = " + 
nodeConnectionStatus.getState());
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/IllegalNodeOffloadExceptionMapper.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/IllegalNodeOffloadExceptionMapper.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/IllegalNodeOffloadExceptionMapper.java
new file mode 100644
index 0000000..890a8a6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/config/IllegalNodeOffloadExceptionMapper.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.config;
+
+import org.apache.nifi.cluster.manager.exception.IllegalNodeOffloadException;
+import org.apache.nifi.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+import javax.ws.rs.ext.Provider;
+
+/**
+ * Maps illegal node offload exceptions into client responses.
+ */
+@Provider
+public class IllegalNodeOffloadExceptionMapper implements 
ExceptionMapper<IllegalNodeOffloadException> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(IllegalNodeOffloadExceptionMapper.class);
+
+    @Override
+    public Response toResponse(IllegalNodeOffloadException exception) {
+        logger.info(String.format("%s. Returning %s response.", exception, 
Response.Status.CONFLICT));
+
+        if (logger.isDebugEnabled()) {
+            logger.debug(StringUtils.EMPTY, exception);
+        }
+        return 
Response.status(Response.Status.CONFLICT).entity(exception.getMessage()).type("text/plain").build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/04d8da8f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json
index eff9e33..0bbd0ed 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/frontend/package.json
@@ -30,7 +30,7 @@
     "d3-selection-multi": "1.0.1",
     "jquery-minicolors": "2.1.10",
     "jquery-ui-dist": "1.12.1",
-    "font-awesome": "4.6.1",
+    "font-awesome": "4.7.0",
     "jquery": "3.1.1",
     "reset.css": "2.0.2",
     "jquery-form": "3.50.0",

Reply via email to