http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FirstNodePartitioner.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FirstNodePartitioner.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FirstNodePartitioner.java
new file mode 100644
index 0000000..10d584e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FirstNodePartitioner.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+public class FirstNodePartitioner implements FlowFilePartitioner {
+
+    @Override
+    public QueuePartition getPartition(final FlowFileRecord flowFile, final 
QueuePartition[] partitions, final QueuePartition localPartition) {
+        return partitions[0];
+    }
+
+    @Override
+    public boolean isRebalanceOnClusterResize() {
+        return true;
+    }
+
+    @Override
+    public boolean isRebalanceOnFailure() {
+        return false;
+    }
+
+    @Override
+    public boolean isPartitionStatic() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FlowFilePartitioner.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FlowFilePartitioner.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FlowFilePartitioner.java
new file mode 100644
index 0000000..4f528e4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/FlowFilePartitioner.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+public interface FlowFilePartitioner {
+
+    /**
+     * Determines which partition the given FlowFile should go to
+     *
+     * @param flowFile the FlowFile to partition
+     * @param partitions the partitions to choose from
+     * @param localPartition the local partition, which is also included in 
the given array of partitions
+     * @return the partition for the FlowFile
+     */
+    QueuePartition getPartition(FlowFileRecord flowFile, QueuePartition[] 
partitions,  QueuePartition localPartition);
+
+    /**
+     * @return <code>true</code> if a change in the size of a cluster should 
result in re-balancing all FlowFiles in queue,
+     *         <code>false</code> if a change in the size of a cluster does 
not require re-balancing.
+     */
+    boolean isRebalanceOnClusterResize();
+
+    /**
+     * @return <code>true</code> if FlowFiles should be rebalanced to another 
partition if they cannot be sent to the designated peer,
+     * <code>false</code> if a failure should result in the FlowFiles 
remaining in same partition.
+     */
+    boolean isRebalanceOnFailure();
+
+    /**
+     * @return <code>true</code> if the return value of {@link 
#getPartition(FlowFileRecord, QueuePartition[], QueuePartition)} will be the 
same
+     * regardless of how many times it is called or which FlowFiles are passed.
+     */
+    default boolean isPartitionStatic() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalPartitionPartitioner.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalPartitionPartitioner.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalPartitionPartitioner.java
new file mode 100644
index 0000000..0f9f9f7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalPartitionPartitioner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+public class LocalPartitionPartitioner implements FlowFilePartitioner {
+    @Override
+    public QueuePartition getPartition(final FlowFileRecord flowFile, final 
QueuePartition[] partitions, final QueuePartition localPartition) {
+        return localPartition;
+    }
+
+    @Override
+    public boolean isRebalanceOnClusterResize() {
+        return false;
+    }
+
+    @Override
+    public boolean isRebalanceOnFailure() {
+        return false;
+    }
+
+    @Override
+    public boolean isPartitionStatic() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
new file mode 100644
index 0000000..9ee0e0e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/LocalQueuePartition.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.queue.FlowFileQueueContents;
+import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.processor.FlowFileFilter;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * An extension of a Queue Partition that contains the methods necessary for 
Processors, Funnels, and Ports to interact with the Partition
+ * as if it were an entire FlowFile Queue itself.
+ */
+public interface LocalQueuePartition extends QueuePartition {
+    /**
+     * @return <code>true</code> if the active queue is empty, 
<code>false</code> otherwise
+     */
+    boolean isActiveQueueEmpty();
+
+    /**
+     * @return <code>true</code> if there is at least one FlowFile that has 
not yet been acknowledged, <code>false</code> if all FlowFiles have been 
acknowledged.
+     */
+    boolean isUnacknowledgedFlowFile();
+
+    /**
+     * Returns a single FlowFile with the highest priority that is available 
in the partition, or <code>null</code> if no FlowFile is available
+     *
+     * @param expiredRecords a Set of FlowFileRecord's to which any expired 
records that are encountered should be added
+     * @return a single FlowFile with the highest priority that is available 
in the partition, or <code>null</code> if no FlowFile is available
+     */
+    FlowFileRecord poll(Set<FlowFileRecord> expiredRecords);
+
+    /**
+     * Returns up to <code>maxResults</code> FlowFiles from the queue
+     *
+     * @param maxResults the maximum number of FlowFiles to return
+     * @param expiredRecords a Set of FlowFileRecord's to which any expired 
records that are encountered should be added
+     * @return a List of FlowFiles (possibly empty) with the highest priority 
FlowFiles that are available in the partition
+     */
+    List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> 
expiredRecords);
+
+    /**
+     * Returns a List of FlowFiles that match the given filter
+     *
+     * @param filter the filter to determine whether or not a given FlowFile 
should be returned
+     * @param expiredRecords a Set of FlowFileRecord's to which any expired 
records that are encountered should be added
+     * @return a List of FlowFiles (possibly empty) with FlowFiles that 
matched the given filter
+     */
+    List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> 
expiredRecords);
+
+    /**
+     * Acknowledges that the given FlowFile has been accounted for and is no 
longer the responsibility of this partition
+     * @param flowFile the FlowFile that has been accounted for
+     */
+    void acknowledge(FlowFileRecord flowFile);
+
+    /**
+     * Acknowledges that the given FlowFiles have been accounted for and is no 
longer the responsibility of this partition
+     * @param flowFiles the FlowFiles that have been accounted for
+     */
+    void acknowledge(Collection<FlowFileRecord> flowFiles);
+
+    /**
+     * Returns the FlowFile with the given UUID, or <code>null</code> if the 
FlowFile with that UUID is not found in the partition
+     *
+     * @param flowFileUuid the UUID of the FlowFile
+     * @return the FlowFile with the given UUID or <code>null</code> if the 
FlowFile cannot be found
+     * @throws IOException if unable to read swapped data from a swap file
+     */
+    FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException;
+
+    /**
+     * Returns the FlowFiles that can be provided as the result of as "List 
FlowFiles" action
+     * @return a List of FlowFiles
+     */
+    List<FlowFileRecord> getListableFlowFiles();
+
+    /**
+     * Inherits the contents of another queue/partition
+     * @param queueContents the contents to inherit
+     */
+    void inheritQueueContents(FlowFileQueueContents queueContents);
+
+    /**
+     * @return diagnostics information about the queue partition
+     */
+    LocalQueuePartitionDiagnostics getQueueDiagnostics();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/QueuePartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/QueuePartition.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/QueuePartition.java
new file mode 100644
index 0000000..c73525b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/QueuePartition.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.queue.FlowFileQueueContents;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Represents a portion of a FlowFile Queue such that a FlowFile Queue can be 
broken into
+ * a local queue partition and 0 or more Remote Queue Partitions.
+ */
+public interface QueuePartition {
+    /**
+     * Discovers any FlowFiles that have been swapped out, returning a summary 
of the swap files' contents
+     * @return a summary of the swap files' contents
+     */
+    SwapSummary recoverSwappedFlowFiles();
+
+    /**
+     * @return the Node Identifier that this Queue Partition corresponds to, 
or and empty Optional if the Node Identifier is not yet known.
+     */
+    Optional<NodeIdentifier> getNodeIdentifier();
+
+    /**
+     * @return the name of the Partition that is used when serializing swap 
flowfiles in order to denote that a swap file belongs to this partition
+     */
+    String getSwapPartitionName();
+
+    /**
+     * Adds the given FlowFile to this partition
+     * @param flowFile the FlowFile to add
+     */
+    void put(FlowFileRecord flowFile);
+
+    /**
+     * Adds the given FlowFiles to this partition
+     * @param flowFiles the FlowFiles to add
+     */
+    void putAll(Collection<FlowFileRecord> flowFiles);
+
+    /**
+     * Drops the FlowFiles in this partition
+     * @param dropRequest the FlowFile Drop Request
+     * @param requestor the user making the request
+     */
+    void dropFlowFiles(DropFlowFileRequest dropRequest, String requestor);
+
+    /**
+     * Updates the prioritizers to use when queueing data
+     * @param newPriorities the new priorities
+     */
+    void setPriorities(List<FlowFilePrioritizer> newPriorities);
+
+    /**
+     * Starts distributing FlowFiles to their desired destinations
+     *
+     * @param flowFilePartitioner the Partitioner that is being used to 
determine which FlowFiles should belong to this Partition
+     */
+    void start(FlowFilePartitioner flowFilePartitioner);
+
+    /**
+     * Stop distributing FlowFiles to other nodes in the cluster. This does 
not interrupt any active transactions but will cause the
+     * partition to not create any more transactions until it is started again.
+     */
+    void stop();
+
+    /**
+     * Provides a {@link FlowFileQueueContents} that can be transferred to 
another partition
+     * @param newPartitionName the name of the partition to which the data is 
being transferred (see {@link #getSwapPartitionName()}.
+     * @return the contents of the queue
+     */
+    FlowFileQueueContents packageForRebalance(String newPartitionName);
+
+    /**
+     * @return the current size of the partition's queue
+     */
+    QueueSize size();
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RebalancingPartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RebalancingPartition.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RebalancingPartition.java
new file mode 100644
index 0000000..9713814
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RebalancingPartition.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.queue.FlowFileQueueContents;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.Collection;
+
+/**
+ * A partition whose sole job it is to redistribute FlowFiles to the 
appropriate partitions.
+ */
+public interface RebalancingPartition extends QueuePartition {
+
+    /**
+     * Inherits all of the FlowFiles, including FlowFiles that have been 
swaped out, in order to
+     * redistribute them across the cluster
+     *
+     * @param queueContents the contents of a FlowFileQueue (or partition)
+     */
+    void rebalance(FlowFileQueueContents queueContents);
+
+    /**
+     * Inherits all of the givne FlowFiles in order to redistribute them 
across the cluster
+     *
+     * @param flowFiles the FlowFiles to redistribute
+     */
+    void rebalance(Collection<FlowFileRecord> flowFiles);
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
new file mode 100644
index 0000000..a78de55
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RemoteQueuePartition.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.queue.FlowFileQueueContents;
+import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics;
+import 
org.apache.nifi.controller.queue.StandardRemoteQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.SwappablePriorityQueue;
+import org.apache.nifi.controller.queue.clustered.TransferFailureDestination;
+import 
org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
+import 
org.apache.nifi.controller.queue.clustered.client.async.TransactionCompleteCallback;
+import 
org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
+import org.apache.nifi.controller.repository.ContentNotFoundException;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.StandardRepositoryRecord;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.provenance.ProvenanceEventBuilder;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * A Queue Partition that is responsible for transferring FlowFiles to another 
node in the cluster
+ */
+public class RemoteQueuePartition implements QueuePartition {
+    private static final Logger logger = 
LoggerFactory.getLogger(RemoteQueuePartition.class);
+
+    private final NodeIdentifier nodeIdentifier;
+    private final SwappablePriorityQueue priorityQueue;
+    private final LoadBalancedFlowFileQueue flowFileQueue;
+    private final TransferFailureDestination failureDestination;
+
+    private final FlowFileRepository flowFileRepo;
+    private final ProvenanceEventRepository provRepo;
+    private final ContentRepository contentRepo;
+    private final AsyncLoadBalanceClientRegistry clientRegistry;
+
+    private boolean running = false;
+    private final String description;
+
+    public RemoteQueuePartition(final NodeIdentifier nodeId, final 
SwappablePriorityQueue priorityQueue, final TransferFailureDestination 
failureDestination,
+                                final FlowFileRepository flowFileRepo, final 
ProvenanceEventRepository provRepo, final ContentRepository contentRepository,
+                                final AsyncLoadBalanceClientRegistry 
clientRegistry, final LoadBalancedFlowFileQueue flowFileQueue) {
+
+        this.nodeIdentifier = nodeId;
+        this.priorityQueue = priorityQueue;
+        this.flowFileQueue = flowFileQueue;
+        this.failureDestination = failureDestination;
+        this.flowFileRepo = flowFileRepo;
+        this.provRepo = provRepo;
+        this.contentRepo = contentRepository;
+        this.clientRegistry = clientRegistry;
+        this.description = "RemoteQueuePartition[queueId=" + 
flowFileQueue.getIdentifier() + ", nodeId=" + nodeIdentifier + "]";
+    }
+
+    @Override
+    public QueueSize size() {
+        return priorityQueue.size();
+    }
+
+    @Override
+    public String getSwapPartitionName() {
+        return nodeIdentifier.getId();
+    }
+
+    @Override
+    public Optional<NodeIdentifier> getNodeIdentifier() {
+        return Optional.ofNullable(nodeIdentifier);
+    }
+
+    @Override
+    public void put(final FlowFileRecord flowFile) {
+        priorityQueue.put(flowFile);
+    }
+
+    @Override
+    public void putAll(final Collection<FlowFileRecord> flowFiles) {
+        priorityQueue.putAll(flowFiles);
+    }
+
+    @Override
+    public void dropFlowFiles(final DropFlowFileRequest dropRequest, final 
String requestor) {
+        priorityQueue.dropFlowFiles(dropRequest, requestor);
+    }
+
+    @Override
+    public SwapSummary recoverSwappedFlowFiles() {
+        return priorityQueue.recoverSwappedFlowFiles();
+    }
+
+    @Override
+    public FlowFileQueueContents packageForRebalance(String newPartitionName) {
+        return priorityQueue.packageForRebalance(newPartitionName);
+    }
+
+    @Override
+    public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
+        priorityQueue.setPriorities(newPriorities);
+    }
+
+    private FlowFileRecord getFlowFile() {
+        final Set<FlowFileRecord> expired = new HashSet<>();
+        final FlowFileRecord flowFile = priorityQueue.poll(expired, 
flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS));
+        flowFileQueue.handleExpiredRecords(expired);
+        return flowFile;
+    }
+
+    @Override
+    public synchronized void start(final FlowFilePartitioner partitioner) {
+        if (running) {
+            return;
+        }
+
+        final TransactionFailureCallback failureCallback = new 
TransactionFailureCallback() {
+            @Override
+            public void onTransactionFailed(final List<FlowFileRecord> 
flowFiles, final Exception cause, final TransactionPhase phase) {
+                // In the case of failure, we need to acknowledge the 
FlowFiles that were removed from the queue,
+                // and then put the FlowFiles back, or transfer them to 
another partition. We do not call
+                // flowFileQueue#onTransfer in the case of failure, though, 
because the size of the FlowFileQueue itself
+                // has not changed. They FlowFiles were just re-queued or 
moved between partitions.
+                priorityQueue.acknowledge(flowFiles);
+
+                if (cause instanceof ContentNotFoundException) {
+                    // Handle ContentNotFound by creating a RepositoryRecord 
for the FlowFile and marking as aborted, then updating the
+                    // FlowFiles and Provenance Repositories accordingly. This 
follows the same pattern as StandardProcessSession so that
+                    // we have a consistent way of handling this case.
+                    final Optional<FlowFileRecord> optionalFlowFile = 
((ContentNotFoundException) cause).getFlowFile();
+                    if (optionalFlowFile.isPresent()) {
+                        final List<FlowFileRecord> successfulFlowFiles = new 
ArrayList<>(flowFiles);
+
+                        final FlowFileRecord flowFile = optionalFlowFile.get();
+                        successfulFlowFiles.remove(flowFile);
+
+                        final StandardRepositoryRecord repoRecord = new 
StandardRepositoryRecord(flowFileQueue, flowFile);
+                        repoRecord.markForAbort();
+
+                        updateRepositories(Collections.emptyList(), 
Collections.singleton(repoRecord));
+
+                        // If unable to even connect to the node, go ahead and 
transfer all FlowFiles for this queue to the failure destination.
+                        // In either case, transfer those FlowFiles that we 
failed to send.
+                        if (phase == TransactionPhase.CONNECTING) {
+                            
failureDestination.putAll(priorityQueue::packageForRebalance, partitioner);
+                        }
+                        failureDestination.putAll(successfulFlowFiles, 
partitioner);
+
+                        
flowFileQueue.onTransfer(Collections.singleton(flowFile)); // Want to ensure 
that we update queue size because FlowFile won't be re-queued.
+
+                        return;
+                    }
+                }
+
+                // If unable to even connect to the node, go ahead and 
transfer all FlowFiles for this queue to the failure destination.
+                // In either case, transfer those FlowFiles that we failed to 
send.
+                if (phase == TransactionPhase.CONNECTING) {
+                    
failureDestination.putAll(priorityQueue::packageForRebalance, partitioner);
+                }
+                failureDestination.putAll(flowFiles, partitioner);
+            }
+
+            @Override
+            public boolean isRebalanceOnFailure() {
+                return failureDestination.isRebalanceOnFailure(partitioner);
+            }
+        };
+
+        final TransactionCompleteCallback successCallback = new 
TransactionCompleteCallback() {
+            @Override
+            public void onTransactionComplete(final List<FlowFileRecord> 
flowFilesSent) {
+                // We've now completed the transaction. We must now update the 
repositories and "keep the books", acknowledging the FlowFiles
+                // with the queue so that its size remains accurate.
+                priorityQueue.acknowledge(flowFilesSent);
+                flowFileQueue.onTransfer(flowFilesSent);
+                updateRepositories(flowFilesSent, Collections.emptyList());
+            }
+        };
+
+        clientRegistry.register(flowFileQueue.getIdentifier(), nodeIdentifier, 
priorityQueue::isEmpty, this::getFlowFile,
+            failureCallback, successCallback, 
flowFileQueue::getLoadBalanceCompression, 
flowFileQueue::isPropagateBackpressureAcrossNodes);
+
+        running = true;
+    }
+
+    public void onRemoved() {
+        clientRegistry.unregister(flowFileQueue.getIdentifier(), 
nodeIdentifier);
+    }
+
+
+    /**
+     * Updates the FlowFileRepository, Provenance Repository, and claimant 
counts in the Content Repository.
+     *
+     * @param flowFilesSent the FlowFiles that were sent to another node.
+     * @param abortedRecords the Repository Records for any FlowFile whose 
content was missing.
+     */
+    private void updateRepositories(final List<FlowFileRecord> flowFilesSent, 
final Collection<RepositoryRecord> abortedRecords) {
+        // We update the Provenance Repository first. This way, even if we 
restart before we update the FlowFile repo, we have the record
+        // that the data was sent in the Provenance Repository. We then update 
the content claims and finally the FlowFile Repository. We do it
+        // in this order so that when the FlowFile repo is sync'ed to disk, we 
know which Content Claims are no longer in use. Updating the FlowFile
+        // Repo first could result in holding those Content Claims on disk 
longer than we need to.
+        //
+        // Additionally, we are iterating over the FlowFiles sent multiple 
times. We could refactor this to iterate over them just once and then
+        // create the Provenance Events and Repository Records in a single 
pass. Doing so, however, would mean that we need to keep both collections
+        // of objects in heap at the same time. Using multiple passes allows 
the Provenance Events to be freed from heap by the GC before the Repo Records
+        // are ever created.
+        final List<ProvenanceEventRecord> provenanceEvents = new 
ArrayList<>(flowFilesSent.size() * 2 + abortedRecords.size());
+        for (final FlowFileRecord sent : flowFilesSent) {
+            provenanceEvents.add(createSendEvent(sent));
+            provenanceEvents.add(createDropEvent(sent));
+        }
+
+        for (final RepositoryRecord abortedRecord : abortedRecords) {
+            final FlowFileRecord abortedFlowFile = abortedRecord.getCurrent();
+            provenanceEvents.add(createDropEvent(abortedFlowFile, "Content Not 
Found"));
+        }
+
+        provRepo.registerEvents(provenanceEvents);
+
+        // Update the FlowFile Repository & content claim counts last
+        final List<RepositoryRecord> flowFileRepoRecords = 
flowFilesSent.stream()
+                .map(this::createRepositoryRecord)
+                .collect(Collectors.toCollection(ArrayList::new));
+
+        flowFileRepoRecords.addAll(abortedRecords);
+
+        // Decrement claimant count for each FlowFile.
+        flowFileRepoRecords.stream()
+                .map(RepositoryRecord::getCurrentClaim)
+                .forEach(contentRepo::decrementClaimantCount);
+
+        try {
+            flowFileRepo.updateRepository(flowFileRepoRecords);
+        } catch (final Exception e) {
+            logger.error("Unable to update FlowFile repository to indicate 
that {} FlowFiles have been transferred to {}. "
+                    + "It is possible that these FlowFiles will be duplicated 
upon restart of NiFi.", flowFilesSent.size(), getNodeIdentifier(), e);
+        }
+    }
+
+    private RepositoryRecord createRepositoryRecord(final FlowFileRecord 
flowFile) {
+        final StandardRepositoryRecord record = new 
StandardRepositoryRecord(flowFileQueue, flowFile);
+        record.markForDelete();
+        return record;
+    }
+
+    private ProvenanceEventRecord createSendEvent(final FlowFileRecord 
flowFile) {
+
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder()
+                .fromFlowFile(flowFile)
+                .setEventType(ProvenanceEventType.SEND)
+                .setDetails("Re-distributed for Load-balanced connection")
+                .setComponentId(flowFileQueue.getIdentifier())
+                .setComponentType("Connection")
+                .setSourceQueueIdentifier(flowFileQueue.getIdentifier())
+                
.setSourceSystemFlowFileIdentifier(flowFile.getAttribute(CoreAttributes.UUID.key()))
+                .setTransitUri("nifi:connection:" + 
flowFileQueue.getIdentifier());
+
+        final ContentClaim contentClaim = flowFile.getContentClaim();
+        if (contentClaim != null) {
+            final ResourceClaim resourceClaim = 
contentClaim.getResourceClaim();
+            
builder.setCurrentContentClaim(resourceClaim.getContainer(),resourceClaim.getSection()
 ,resourceClaim.getId(),
+                    contentClaim.getOffset() + 
flowFile.getContentClaimOffset(), flowFile.getSize());
+
+            
builder.setPreviousContentClaim(resourceClaim.getContainer(),resourceClaim.getSection()
 ,resourceClaim.getId(),
+                    contentClaim.getOffset() + 
flowFile.getContentClaimOffset(), flowFile.getSize());
+        }
+
+        final ProvenanceEventRecord sendEvent = builder.build();
+
+        return sendEvent;
+    }
+
+    private ProvenanceEventRecord createDropEvent(final FlowFileRecord 
flowFile) {
+        return createDropEvent(flowFile, null);
+    }
+
+    private ProvenanceEventRecord createDropEvent(final FlowFileRecord 
flowFile, final String details) {
+        final ProvenanceEventBuilder builder = new 
StandardProvenanceEventRecord.Builder()
+                .fromFlowFile(flowFile)
+                .setEventType(ProvenanceEventType.DROP)
+                .setDetails(details)
+                .setComponentId(flowFileQueue.getIdentifier())
+                .setComponentType("Connection")
+                .setSourceQueueIdentifier(flowFileQueue.getIdentifier());
+
+        final ContentClaim contentClaim = flowFile.getContentClaim();
+        if (contentClaim != null) {
+            final ResourceClaim resourceClaim = 
contentClaim.getResourceClaim();
+            
builder.setCurrentContentClaim(resourceClaim.getContainer(),resourceClaim.getSection()
 ,resourceClaim.getId(),
+                    contentClaim.getOffset() + 
flowFile.getContentClaimOffset(), flowFile.getSize());
+
+            
builder.setPreviousContentClaim(resourceClaim.getContainer(),resourceClaim.getSection()
 ,resourceClaim.getId(),
+                    contentClaim.getOffset() + 
flowFile.getContentClaimOffset(), flowFile.getSize());
+        }
+
+        final ProvenanceEventRecord dropEvent = builder.build();
+
+        return dropEvent;
+    }
+
+
+    @Override
+    public synchronized void stop() {
+        running = false;
+        clientRegistry.unregister(flowFileQueue.getIdentifier(), 
nodeIdentifier);
+    }
+
+    public RemoteQueuePartitionDiagnostics getDiagnostics() {
+        return new 
StandardRemoteQueuePartitionDiagnostics(nodeIdentifier.toString(), 
priorityQueue.getFlowFileQueueSize());
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RoundRobinPartitioner.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RoundRobinPartitioner.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RoundRobinPartitioner.java
new file mode 100644
index 0000000..c08724d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/RoundRobinPartitioner.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.controller.repository.FlowFileRecord;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class RoundRobinPartitioner implements FlowFilePartitioner {
+    private final AtomicLong counter = new AtomicLong(0L);
+
+    @Override
+    public QueuePartition getPartition(final FlowFileRecord flowFile, final 
QueuePartition[] partitions,  final QueuePartition localPartition) {
+        final long count = counter.getAndIncrement();
+        final int index = (int) (count % partitions.length);
+        return partitions[index];
+    }
+
+    @Override
+    public boolean isRebalanceOnClusterResize() {
+        return false;
+    }
+
+
+    @Override
+    public boolean isRebalanceOnFailure() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
new file mode 100644
index 0000000..74a8aa6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/StandardRebalancingPartition.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.queue.BlockingSwappablePriorityQueue;
+import org.apache.nifi.controller.queue.DropFlowFileAction;
+import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.queue.FlowFileQueueContents;
+import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+public class StandardRebalancingPartition implements RebalancingPartition {
+    private final String SWAP_PARTITION_NAME = "rebalance";
+    private final String queueIdentifier;
+    private final BlockingSwappablePriorityQueue queue;
+    private final LoadBalancedFlowFileQueue flowFileQueue;
+    private final String description;
+
+    private volatile boolean stopped = true;
+    private RebalanceTask rebalanceTask;
+
+
+    public StandardRebalancingPartition(final FlowFileSwapManager swapManager, 
final int swapThreshold, final EventReporter eventReporter,
+                                        final LoadBalancedFlowFileQueue 
flowFileQueue, final DropFlowFileAction dropAction) {
+
+        this.queue = new BlockingSwappablePriorityQueue(swapManager, 
swapThreshold, eventReporter, flowFileQueue, dropAction, SWAP_PARTITION_NAME);
+        this.queueIdentifier = flowFileQueue.getIdentifier();
+        this.flowFileQueue = flowFileQueue;
+        this.description = "RebalancingPartition[queueId=" + queueIdentifier + 
"]";
+    }
+
+    @Override
+    public Optional<NodeIdentifier> getNodeIdentifier() {
+        return Optional.empty();
+    }
+
+    @Override
+    public QueueSize size() {
+        return queue.size();
+    }
+
+    @Override
+    public SwapSummary recoverSwappedFlowFiles() {
+        return this.queue.recoverSwappedFlowFiles();
+    }
+
+    @Override
+    public String getSwapPartitionName() {
+        return SWAP_PARTITION_NAME;
+    }
+
+    @Override
+    public void put(final FlowFileRecord flowFile) {
+        queue.put(flowFile);
+    }
+
+    @Override
+    public void putAll(final Collection<FlowFileRecord> flowFiles) {
+        queue.putAll(flowFiles);
+    }
+
+    @Override
+    public void dropFlowFiles(DropFlowFileRequest dropRequest, String 
requestor) {
+        queue.dropFlowFiles(dropRequest, requestor);
+    }
+
+    @Override
+    public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
+        queue.setPriorities(newPriorities);
+    }
+
+    @Override
+    public synchronized void start(final FlowFilePartitioner partitionerUsed) {
+        stopped = false;
+        rebalanceFromQueue();
+    }
+
+    @Override
+    public synchronized void stop() {
+        stopped = true;
+
+        if (this.rebalanceTask != null) {
+            this.rebalanceTask.stop();
+        }
+
+        this.rebalanceTask = null;
+    }
+
+    private synchronized void rebalanceFromQueue() {
+        if (stopped) {
+            return;
+        }
+
+        // If a task is already defined, do nothing. There's already a thread 
running.
+        if (rebalanceTask != null) {
+            return;
+        }
+
+        this.rebalanceTask = new RebalanceTask();
+
+        final Thread rebalanceThread = new Thread(this.rebalanceTask);
+        rebalanceThread.setName("Rebalance queued data for Connection " + 
queueIdentifier);
+        rebalanceThread.start();
+    }
+
+    @Override
+    public void rebalance(final FlowFileQueueContents queueContents) {
+        if (queueContents.getActiveFlowFiles().isEmpty() && 
queueContents.getSwapLocations().isEmpty()) {
+            return;
+        }
+
+        queue.inheritQueueContents(queueContents);
+        rebalanceFromQueue();
+    }
+
+    @Override
+    public void rebalance(final Collection<FlowFileRecord> flowFiles) {
+        queue.putAll(flowFiles);
+        rebalanceFromQueue();
+    }
+
+    @Override
+    public FlowFileQueueContents packageForRebalance(String newPartitionName) {
+        return queue.packageForRebalance(newPartitionName);
+    }
+
+    private synchronized boolean complete() {
+        if (!queue.isEmpty()) {
+            return false;
+        }
+
+        this.rebalanceTask = null;
+        return true;
+    }
+
+
+    private class RebalanceTask implements Runnable {
+        private volatile boolean stopped = false;
+        private final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        private final long pollWaitMillis = 100L;
+
+        public void stop() {
+            stopped = true;
+        }
+
+        @Override
+        public void run() {
+            while (!stopped) {
+                final FlowFileRecord polled;
+
+                expiredRecords.clear();
+
+                // Wait up to #pollWaitMillis milliseconds to get a FlowFile. 
If none, then check if stopped
+                // and if not, poll again.
+                try {
+                    polled = queue.poll(expiredRecords, -1, pollWaitMillis);
+                } catch (final InterruptedException ie) {
+                    Thread.currentThread().interrupt();
+                    continue;
+                }
+
+                if (polled == null) {
+                    flowFileQueue.handleExpiredRecords(expiredRecords);
+
+                    if (complete()) {
+                        return;
+                    } else {
+                        continue;
+                    }
+                }
+
+                // We got 1 FlowFile. Try a second poll to obtain up to 999 
more (for a total of 1,000).
+                final List<FlowFileRecord> toDistribute = new ArrayList<>();
+                toDistribute.add(polled);
+
+                final List<FlowFileRecord> additionalRecords = queue.poll(999, 
expiredRecords, -1);
+                toDistribute.addAll(additionalRecords);
+
+                flowFileQueue.handleExpiredRecords(expiredRecords);
+
+                // Transfer all of the FlowFiles that we got back to the 
FlowFileQueue itself. This will cause the data to be
+                // re-partitioned and binned appropriately. We also then need 
to ensure that we acknowledge the data from our
+                // own SwappablePriorityQueue to ensure that the sizes are 
kept in check.
+                flowFileQueue.distributeToPartitions(toDistribute);
+                queue.acknowledge(toDistribute);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
new file mode 100644
index 0000000..e5e64d0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/partition/SwappablePriorityQueueLocalPartition.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.partition;
+
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.controller.queue.DropFlowFileAction;
+import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.FlowFileQueueContents;
+import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.SwappablePriorityQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileSwapManager;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Local Queue Partition that whose implementation is based on the use of a 
{@link SwappablePriorityQueue}.
+ */
+public class SwappablePriorityQueueLocalPartition implements 
LocalQueuePartition {
+    private static final String SWAP_PARTITION_NAME = "local";
+
+    private final SwappablePriorityQueue priorityQueue;
+    private final FlowFileQueue flowFileQueue;
+    private final String description;
+
+    public SwappablePriorityQueueLocalPartition(final FlowFileSwapManager 
swapManager, final int swapThreshold, final EventReporter eventReporter,
+            final FlowFileQueue flowFileQueue, final DropFlowFileAction 
dropAction) {
+        this.priorityQueue = new SwappablePriorityQueue(swapManager, 
swapThreshold, eventReporter, flowFileQueue, dropAction, SWAP_PARTITION_NAME);
+        this.flowFileQueue = flowFileQueue;
+        this.description = "SwappablePriorityQueueLocalPartition[queueId=" + 
flowFileQueue.getIdentifier() + "]";
+    }
+
+    @Override
+    public String getSwapPartitionName() {
+        return SWAP_PARTITION_NAME;
+    }
+
+    @Override
+    public QueueSize size() {
+        return priorityQueue.size();
+    }
+
+    @Override
+    public boolean isUnacknowledgedFlowFile() {
+        return priorityQueue.isUnacknowledgedFlowFile();
+    }
+
+    @Override
+    public Optional<NodeIdentifier> getNodeIdentifier() {
+        return Optional.empty();
+    }
+
+    @Override
+    public void put(final FlowFileRecord flowFile) {
+        priorityQueue.put(flowFile);
+    }
+
+    @Override
+    public void putAll(final Collection<FlowFileRecord> flowFiles) {
+        priorityQueue.putAll(flowFiles);
+    }
+
+    @Override
+    public boolean isActiveQueueEmpty() {
+        return priorityQueue.isActiveQueueEmpty();
+    }
+
+    @Override
+    public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords) {
+        return priorityQueue.poll(expiredRecords, getExpiration());
+    }
+
+    @Override
+    public List<FlowFileRecord> poll(final int maxResults, final 
Set<FlowFileRecord> expiredRecords) {
+        return priorityQueue.poll(maxResults, expiredRecords, getExpiration());
+    }
+
+    @Override
+    public List<FlowFileRecord> poll(final FlowFileFilter filter, final 
Set<FlowFileRecord> expiredRecords) {
+        return priorityQueue.poll(filter, expiredRecords, getExpiration());
+    }
+
+    private int getExpiration() {
+        return flowFileQueue.getFlowFileExpiration(TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public FlowFileRecord getFlowFile(final String flowFileUuid) throws 
IOException {
+        return priorityQueue.getFlowFile(flowFileUuid);
+    }
+
+    @Override
+    public List<FlowFileRecord> getListableFlowFiles() {
+        return priorityQueue.getActiveFlowFiles();
+    }
+
+    @Override
+    public void dropFlowFiles(final DropFlowFileRequest dropRequest, final 
String requestor) {
+        priorityQueue.dropFlowFiles(dropRequest, requestor);
+    }
+
+    @Override
+    public SwapSummary recoverSwappedFlowFiles() {
+        return priorityQueue.recoverSwappedFlowFiles();
+    }
+
+    @Override
+    public void setPriorities(final List<FlowFilePrioritizer> newPriorities) {
+        priorityQueue.setPriorities(newPriorities);
+    }
+
+    @Override
+    public void acknowledge(final FlowFileRecord flowFile) {
+        priorityQueue.acknowledge(flowFile);
+    }
+
+    @Override
+    public void acknowledge(final Collection<FlowFileRecord> flowFiles) {
+        priorityQueue.acknowledge(flowFiles);
+    }
+
+    @Override
+    public LocalQueuePartitionDiagnostics getQueueDiagnostics() {
+        return priorityQueue.getQueueDiagnostics();
+    }
+
+    @Override
+    public FlowFileQueueContents packageForRebalance(String newPartitionName) {
+        return priorityQueue.packageForRebalance(newPartitionName);
+    }
+
+    @Override
+    public void start(final FlowFilePartitioner partitionerUsed) {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public void inheritQueueContents(final FlowFileQueueContents 
queueContents) {
+        priorityQueue.inheritQueueContents(queueContents);
+    }
+
+    @Override
+    public String toString() {
+        return description;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/protocol/LoadBalanceProtocolConstants.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/protocol/LoadBalanceProtocolConstants.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/protocol/LoadBalanceProtocolConstants.java
new file mode 100644
index 0000000..5b02f13
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/protocol/LoadBalanceProtocolConstants.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.protocol;
+
+public class LoadBalanceProtocolConstants {
+    // Protocol negotiation constants
+    public static final int VERSION_ACCEPTED = 0x10;
+    public static final int REQEUST_DIFFERENT_VERSION = 0x11;
+    public static final int ABORT_PROTOCOL_NEGOTIATION = 0x12;
+
+    // Transaction constants
+    public static final int CONFIRM_CHECKSUM = 0x21;
+    public static final int REJECT_CHECKSUM = 0x22;
+    public static final int COMPLETE_TRANSACTION = 0x23;
+    public static final int ABORT_TRANSACTION = 0x24;
+    public static final int CONFIRM_COMPLETE_TRANSACTION = 0x25;
+
+    // FlowFile constants
+    public static final int MORE_FLOWFILES = 0x31;
+    public static final int NO_MORE_FLOWFILES = 0x32;
+
+    // Backpressure / Space constants
+    public static final int CHECK_SPACE = 0x61;
+    public static final int SKIP_SPACE_CHECK = 0x62;
+    public static final int SPACE_AVAILABLE = 0x65;
+    public static final int QUEUE_FULL = 0x66;
+
+    // data frame constants
+    public static final int NO_DATA_FRAME = 0x40;
+    public static final int DATA_FRAME_FOLLOWS = 0x42;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
new file mode 100644
index 0000000..43187b5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ClusterLoadBalanceAuthorizer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.server;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class ClusterLoadBalanceAuthorizer implements LoadBalanceAuthorizer {
+    private static final Logger logger = 
LoggerFactory.getLogger(ClusterLoadBalanceAuthorizer.class);
+
+    private final ClusterCoordinator clusterCoordinator;
+    private final EventReporter eventReporter;
+
+    public ClusterLoadBalanceAuthorizer(final ClusterCoordinator 
clusterCoordinator, final EventReporter eventReporter) {
+        this.clusterCoordinator = clusterCoordinator;
+        this.eventReporter = eventReporter;
+    }
+
+    @Override
+    public void authorize(final Collection<String> clientIdentities) throws 
NotAuthorizedException {
+        if (clientIdentities == null) {
+            logger.debug("Client Identities is null, so assuming that Load 
Balancing communications are not secure. Authorizing client to participate in 
Load Balancing");
+            return;
+        }
+
+        final Set<String> nodeIds = 
clusterCoordinator.getNodeIdentifiers().stream()
+                .map(NodeIdentifier::getApiAddress)
+                .collect(Collectors.toSet());
+
+        for (final String clientId : clientIdentities) {
+            if (nodeIds.contains(clientId)) {
+                logger.debug("Client ID '{}' is in the list of Nodes in the 
Cluster. Authorizing Client to Load Balance data", clientId);
+                return;
+            }
+        }
+
+        final String message = String.format("Authorization failed for Client 
ID's %s to Load Balance data because none of the ID's are known Cluster Node 
Identifiers",
+                clientIdentities);
+
+        logger.warn(message);
+        eventReporter.reportEvent(Severity.WARNING, "Load Balanced 
Connections", message);
+        throw new NotAuthorizedException("Client ID's " + clientIdentities + " 
are not authorized to Load Balance data");
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
new file mode 100644
index 0000000..93fc2d7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.server;
+
+import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLServerSocket;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+
+public class ConnectionLoadBalanceServer {
+    private static final Logger logger = 
LoggerFactory.getLogger(ConnectionLoadBalanceServer.class);
+
+    private final String hostname;
+    private final int port;
+    private final SSLContext sslContext;
+    private final ExecutorService threadPool;
+    private final LoadBalanceProtocol loadBalanceProtocol;
+    private final int connectionTimeoutMillis;
+    private final int numThreads;
+    private final EventReporter eventReporter;
+
+    private volatile Set<CommunicateAction> communicationActions = 
Collections.emptySet();
+    private final BlockingQueue<Socket> connectionQueue = new 
LinkedBlockingQueue<>();
+
+    private volatile AcceptConnection acceptConnection;
+    private volatile ServerSocket serverSocket;
+    private volatile boolean stopped = true;
+
+    public ConnectionLoadBalanceServer(final String hostname, final int port, 
final SSLContext sslContext, final int numThreads, final LoadBalanceProtocol 
loadBalanceProtocol,
+                                       final EventReporter eventReporter, 
final int connectionTimeoutMillis) {
+        this.hostname = hostname;
+        this.port = port;
+        this.sslContext = sslContext;
+        this.loadBalanceProtocol = loadBalanceProtocol;
+        this.connectionTimeoutMillis = connectionTimeoutMillis;
+        this.numThreads = numThreads;
+        this.eventReporter = eventReporter;
+
+        threadPool = new FlowEngine(numThreads, "Load Balance Server");
+    }
+
+    public void start() throws IOException {
+        if (!stopped) {
+            return;
+        }
+
+        stopped = false;
+        if (serverSocket != null) {
+            return;
+        }
+
+        try {
+            serverSocket = createServerSocket();
+        } catch (final Exception e) {
+            throw new IOException("Could not begin listening for incoming 
connections in order to load balance data across the cluster. Please verify the 
values of the " +
+                    "'nifi.cluster.load.balance.port' and 
'nifi.cluster.load.balance.host' properties as well as the 'nifi.security.*' 
properties", e);
+        }
+
+        final Set<CommunicateAction> actions = new HashSet<>(numThreads);
+        for (int i=0; i < numThreads; i++) {
+            final CommunicateAction action = new 
CommunicateAction(loadBalanceProtocol);
+            actions.add(action);
+            threadPool.submit(action);
+        }
+
+        this.communicationActions = actions;
+
+        acceptConnection = new AcceptConnection(serverSocket);
+        final Thread receiveConnectionThread = new Thread(acceptConnection);
+        receiveConnectionThread.setName("Receive Queue Load-Balancing 
Connections");
+        receiveConnectionThread.start();
+    }
+
+    public int getPort() {
+        return serverSocket.getLocalPort();
+    }
+
+    public void stop() {
+        stopped = false;
+        threadPool.shutdown();
+
+        if (acceptConnection != null) {
+            acceptConnection.stop();
+        }
+
+        communicationActions.forEach(CommunicateAction::stop);
+
+        Socket socket;
+        while ((socket = connectionQueue.poll()) != null) {
+            try {
+                socket.close();
+                logger.info("{} Closed connection to {} on Server stop", this, 
socket.getRemoteSocketAddress());
+            } catch (final IOException ioe) {
+                logger.warn("Failed to properly close socket to " + 
socket.getRemoteSocketAddress(), ioe);
+            }
+        }
+    }
+
+    private ServerSocket createServerSocket() throws IOException {
+        final InetAddress inetAddress = hostname == null ? null : 
InetAddress.getByName(hostname);
+
+        if (sslContext == null) {
+            return new ServerSocket(port, 50, InetAddress.getByName(hostname));
+        } else {
+            final ServerSocket serverSocket = 
sslContext.getServerSocketFactory().createServerSocket(port, 50, inetAddress);
+            ((SSLServerSocket) serverSocket).setNeedClientAuth(true);
+            return serverSocket;
+        }
+    }
+
+
+    private class CommunicateAction implements Runnable {
+        private final LoadBalanceProtocol loadBalanceProtocol;
+        private volatile boolean stopped = false;
+
+        public CommunicateAction(final LoadBalanceProtocol 
loadBalanceProtocol) {
+            this.loadBalanceProtocol = loadBalanceProtocol;
+        }
+
+        public void stop() {
+            this.stopped = true;
+        }
+
+        @Override
+        public void run() {
+            String peerDescription = "<Unknown Client>";
+
+            while (!stopped) {
+                Socket socket = null;
+                try {
+                    socket = connectionQueue.poll(1, TimeUnit.SECONDS);
+                    if (socket == null) {
+                        continue;
+                    }
+
+                    peerDescription = 
socket.getRemoteSocketAddress().toString();
+
+                    if (socket.isClosed()) {
+                        logger.debug("Connection to Peer {} is closed. Will 
not attempt to communicate over this Socket.", peerDescription);
+                        continue;
+                    }
+
+                    logger.debug("Receiving FlowFiles from Peer {}", 
peerDescription);
+                    loadBalanceProtocol.receiveFlowFiles(socket);
+
+                    if (socket.isConnected()) {
+                        logger.debug("Finished receiving FlowFiles from Peer 
{}. Will recycle connection.", peerDescription);
+                        connectionQueue.offer(socket);
+                    } else {
+                        logger.debug("Finished receiving FlowFiles from Peer 
{}. Socket is no longer connected so will not recycle connection.", 
peerDescription);
+                    }
+                } catch (final Exception e) {
+                    if (socket != null) {
+                        try {
+                            socket.close();
+                        } catch (final IOException ioe) {
+                            e.addSuppressed(ioe);
+                        }
+                    }
+
+                    logger.error("Failed to communicate with Peer {}", 
peerDescription, e);
+                    eventReporter.reportEvent(Severity.ERROR, "Load Balanced 
Connection", "Failed to receive FlowFiles for Load Balancing due to " + e);
+                }
+            }
+
+            logger.info("Connection Load Balance Server shutdown. Will no 
longer handle incoming requests.");
+        }
+    }
+
+
+    private class AcceptConnection implements Runnable {
+        private final ServerSocket serverSocket;
+        private volatile boolean stopped = false;
+
+        public AcceptConnection(final ServerSocket serverSocket) {
+            this.serverSocket = serverSocket;
+        }
+
+        public void stop() {
+            stopped = true;
+        }
+
+        @Override
+        public void run() {
+            try {
+                serverSocket.setSoTimeout(1000);
+            } catch (final Exception e) {
+                logger.error("Failed to set soTimeout on Server Socket for 
Load Balancing data across cluster", e);
+            }
+
+            while (!stopped) {
+                try {
+                    final Socket socket;
+                    try {
+                        socket = serverSocket.accept();
+                    } catch (final SocketTimeoutException ste) {
+                        continue;
+                    }
+
+                    socket.setSoTimeout(connectionTimeoutMillis);
+                    connectionQueue.offer(socket);
+                } catch (final Exception e) {
+                    logger.error("{} Failed to accept connection from other 
node in cluster", ConnectionLoadBalanceServer.this, e);
+                }
+            }
+
+            try {
+                serverSocket.close();
+            } catch (final Exception e) {
+                logger.warn("Failed to properly shutdown Server Socket for 
Load Balancing", e);
+            }
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ConnectionLoadBalanceServer[hostname=" + hostname + ", port=" 
+ port + ", secure=" + (sslContext != null) + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
new file mode 100644
index 0000000..3a716e2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceAuthorizer.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.server;
+
+import java.util.Collection;
+
+public interface LoadBalanceAuthorizer {
+    void authorize(Collection<String> clientIdentities) throws 
NotAuthorizedException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
new file mode 100644
index 0000000..5a74ebc
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/LoadBalanceProtocol.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.server;
+
+import java.io.IOException;
+import java.net.Socket;
+
+public interface LoadBalanceProtocol {
+
+    /**
+     * Receives FlowFiles from the peer attached to the socket
+     *
+     * @param socket the socket to read from and write to
+     *
+     * @throws TransactionAbortedException if the transaction was aborted
+     * @throws IOException if unable to communicate with the peer
+     */
+    void receiveFlowFiles(Socket socket) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/NotAuthorizedException.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/NotAuthorizedException.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/NotAuthorizedException.java
new file mode 100644
index 0000000..8aa1d53
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/NotAuthorizedException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.server;
+
+import java.io.IOException;
+
+public class NotAuthorizedException extends IOException {
+    public NotAuthorizedException(String message) {
+        super(message);
+    }
+}

Reply via email to