http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
new file mode 100644
index 0000000..971770a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import org.apache.nifi.cluster.coordination.ClusterCoordinator;
+import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
+import org.apache.nifi.cluster.protocol.NodeIdentifier;
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.MockFlowFileRecord;
+import org.apache.nifi.controller.MockSwapManager;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.queue.NopConnectionEventListener;
+import org.apache.nifi.controller.queue.QueueSize;
+import 
org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry;
+import 
org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner;
+import org.apache.nifi.controller.queue.clustered.partition.QueuePartition;
+import 
org.apache.nifi.controller.queue.clustered.partition.RoundRobinPartitioner;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.SwapSummary;
+import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.provenance.ProvenanceEventRepository;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestSocketLoadBalancedFlowFileQueue {
+
+    private Connection connection;
+    private FlowFileRepository flowFileRepo;
+    private ContentRepository contentRepo;
+    private ProvenanceEventRepository provRepo;
+    private ResourceClaimManager claimManager;
+    private ClusterCoordinator clusterCoordinator;
+    private MockSwapManager swapManager;
+    private EventReporter eventReporter;
+    private SocketLoadBalancedFlowFileQueue queue;
+    private volatile ClusterTopologyEventListener clusterTopologyEventListener;
+
+    private List<NodeIdentifier> nodeIds;
+    private int nodePort = 4096;
+
+    @Before
+    public void setup() {
+        MockFlowFileRecord.resetIdGenerator();
+        connection = mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("unit-test");
+
+        flowFileRepo = mock(FlowFileRepository.class);
+        contentRepo = mock(ContentRepository.class);
+        provRepo = mock(ProvenanceEventRepository.class);
+        claimManager = new StandardResourceClaimManager();
+        clusterCoordinator = mock(ClusterCoordinator.class);
+        swapManager = new MockSwapManager();
+        eventReporter = EventReporter.NO_OP;
+
+        final NodeIdentifier localNodeIdentifier = createNodeIdentifier();
+
+        nodeIds = new ArrayList<>();
+        nodeIds.add(localNodeIdentifier);
+        nodeIds.add(createNodeIdentifier());
+        nodeIds.add(createNodeIdentifier());
+
+        Mockito.doAnswer(new Answer<Set<NodeIdentifier>>() {
+            @Override
+            public Set<NodeIdentifier> answer(InvocationOnMock invocation) 
throws Throwable {
+                return new HashSet<>(nodeIds);
+            }
+        }).when(clusterCoordinator).getNodeIdentifiers();
+
+        
when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeIdentifier);
+
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(final InvocationOnMock invocation) throws 
Throwable {
+                clusterTopologyEventListener = invocation.getArgumentAt(0, 
ClusterTopologyEventListener.class);
+                return null;
+            }
+        
}).when(clusterCoordinator).registerEventListener(Mockito.any(ClusterTopologyEventListener.class));
+
+        final ProcessScheduler scheduler = mock(ProcessScheduler.class);
+
+        final AsyncLoadBalanceClientRegistry registry = 
mock(AsyncLoadBalanceClientRegistry.class);
+        queue = new SocketLoadBalancedFlowFileQueue("unit-test", new 
NopConnectionEventListener(), scheduler, flowFileRepo, provRepo,
+            contentRepo, claimManager, clusterCoordinator, registry, 
swapManager, 10000, eventReporter);
+    }
+
+    private NodeIdentifier createNodeIdentifier() {
+        return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", 
nodePort++, "localhost", nodePort++,
+            "localhost", nodePort++, "localhost", nodePort++, nodePort++, 
true, Collections.emptySet());
+    }
+
+    @Test
+    public void testBinsAccordingToPartitioner() {
+        final FlowFilePartitioner partitioner = new 
StaticFlowFilePartitioner(1);
+        queue.setFlowFilePartitioner(partitioner);
+
+        final QueuePartition desiredPartition = queue.getPartition(1);
+        for (int i = 0; i < 100; i++) {
+            final MockFlowFileRecord flowFile = new MockFlowFileRecord(0L);
+            final QueuePartition partition = 
queue.putAndGetPartition(flowFile);
+            assertSame(desiredPartition, partition);
+        }
+    }
+
+    @Test
+    public void testPutAllBinsFlowFilesSeparately() {
+        // Partition data based on size. FlowFiles with 0 bytes will go to 
partition 0 (local partition),
+        // FlowFiles with 1 byte will go to partition 1, and FlowFiles with 2 
bytes will go to partition 2.
+        final FlowFilePartitioner partitioner = new FlowFileSizePartitioner();
+        queue.setFlowFilePartitioner(partitioner);
+
+        // Add 3 FlowFiles for each size
+        final List<FlowFileRecord> flowFiles = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            flowFiles.add(new MockFlowFileRecord(0));
+            flowFiles.add(new MockFlowFileRecord(1));
+            flowFiles.add(new MockFlowFileRecord(2));
+        }
+
+        final Map<QueuePartition, List<FlowFileRecord>> partitionMap = 
queue.putAllAndGetPartitions(flowFiles);
+        assertEquals(3, partitionMap.size());
+
+        // For each partition, get the List of FlowFiles added to it, then 
verify that there are 3 FlowFiles with that size.
+        for (int i = 0; i < 3; i++) {
+            final QueuePartition partition = queue.getPartition(i);
+            final List<FlowFileRecord> flowFilesForPartition = 
partitionMap.get(partition);
+            assertNotNull(flowFilesForPartition);
+            assertEquals(3, flowFilesForPartition.size());
+
+            for (final FlowFileRecord flowFile : flowFilesForPartition) {
+                assertEquals(i, flowFile.getSize());
+            }
+        }
+    }
+
+    private int determineRemotePartitionIndex() {
+        final QueuePartition localPartition = queue.getLocalPartition();
+        if (queue.getPartition(0) == localPartition) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+    private int determineLocalPartitionIndex() {
+        final QueuePartition localPartition = queue.getLocalPartition();
+        for (int i=0; i < clusterCoordinator.getNodeIdentifiers().size(); i++) 
{
+            if (queue.getPartition(i) == localPartition) {
+                return i;
+            }
+        }
+
+        throw new IllegalStateException("Could not determine local partition 
index");
+    }
+
+    @Test
+    public void testIsEmptyWhenFlowFileInRemotePartition() {
+        queue.setFlowFilePartitioner(new 
StaticFlowFilePartitioner(determineRemotePartitionIndex()));
+
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+        assertEquals(new QueueSize(0, 0L), queue.size());
+
+        queue.put(new MockFlowFileRecord(0L));
+        assertFalse(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+        assertEquals(new QueueSize(1, 0L), queue.size());
+
+        assertNull(queue.poll(new HashSet<>()));
+        assertFalse(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+        assertEquals(new QueueSize(1, 0L), queue.size());
+    }
+
+    @Test
+    public void testIsEmptyWhenFlowFileInLocalPartition() {
+        queue.setFlowFilePartitioner(new 
StaticFlowFilePartitioner(determineLocalPartitionIndex()));
+
+        // Ensure queue is empty
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+        assertEquals(new QueueSize(0, 0L), queue.size());
+
+        // add a flowfile
+        final FlowFileRecord flowFile = new MockFlowFileRecord(0L);
+        queue.put(flowFile);
+        assertFalse(queue.isEmpty());
+        assertFalse(queue.isActiveQueueEmpty());
+        assertEquals(new QueueSize(1, 0L), queue.size());
+
+        // Ensure that we get the same FlowFile back. This will not decrement
+        // the queue size, only acknowledging the FlowFile will do that.
+        assertSame(flowFile, queue.poll(new HashSet<>()));
+        assertFalse(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+        assertEquals(new QueueSize(1, 0L), queue.size());
+
+        // Acknowledging FlowFile should reduce queue size
+        queue.acknowledge(flowFile);
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+        assertEquals(new QueueSize(0, 0L), queue.size());
+
+        // Add FlowFile back in, poll it to ensure that we get it back, and
+        // then acknowledge as a Collection and ensure the correct sizes.
+        queue.put(flowFile);
+        assertFalse(queue.isEmpty());
+        assertFalse(queue.isActiveQueueEmpty());
+        assertEquals(new QueueSize(1, 0L), queue.size());
+
+        assertSame(flowFile, queue.poll(new HashSet<>()));
+        assertFalse(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+        assertEquals(new QueueSize(1, 0L), queue.size());
+
+        queue.acknowledge(Collections.singleton(flowFile));
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+        assertEquals(new QueueSize(0, 0L), queue.size());
+    }
+
+    @Test
+    public void testGetFlowFile() throws IOException {
+        queue.setFlowFilePartitioner(new FlowFileSizePartitioner());
+
+        final Map<String, String> localAttributes = 
Collections.singletonMap("uuid", "local");
+        final MockFlowFileRecord localFlowFile = new 
MockFlowFileRecord(localAttributes, determineLocalPartitionIndex());
+
+        final Map<String, String> remoteAttributes = 
Collections.singletonMap("uuid", "remote");
+        final MockFlowFileRecord remoteFlowFile = new 
MockFlowFileRecord(remoteAttributes, determineRemotePartitionIndex());
+
+        queue.put(localFlowFile);
+        queue.put(remoteFlowFile);
+
+        assertSame(localFlowFile, queue.getFlowFile("local"));
+        assertNull(queue.getFlowFile("remote"));
+        assertNull(queue.getFlowFile("other"));
+    }
+
+
+    @Test
+    public void testRecoverSwapFiles() throws IOException {
+        for (int partitionIndex = 0; partitionIndex < 3; partitionIndex++) {
+            final String partitionName = 
queue.getPartition(partitionIndex).getSwapPartitionName();
+
+            final List<FlowFileRecord> flowFiles = new ArrayList<>();
+            for (int i = 0; i < 100; i++) {
+                flowFiles.add(new MockFlowFileRecord(100L));
+            }
+
+            swapManager.swapOut(flowFiles, queue, partitionName);
+        }
+
+        final List<FlowFileRecord> flowFiles = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            flowFiles.add(new MockFlowFileRecord(100L));
+        }
+
+        swapManager.swapOut(flowFiles, queue, "other-partition");
+
+        final SwapSummary swapSummary = queue.recoverSwappedFlowFiles();
+        assertEquals(399L, swapSummary.getMaxFlowFileId().longValue());
+        assertEquals(400, swapSummary.getQueueSize().getObjectCount());
+        assertEquals(400 * 100L, swapSummary.getQueueSize().getByteCount());
+    }
+
+
+    @Test(timeout = 10000)
+    public void testChangeInClusterTopologyTriggersRebalance() throws 
InterruptedException {
+        // Create partitioner that sends first 2 FlowFiles to Partition 0, 
next 2 to Partition 1, and then next 4 to Partition 3.
+        queue.setFlowFilePartitioner(new StaticSequencePartitioner(new int[] 
{0, 0, 1, 1, 3, 3, 3, 3}, true));
+
+        for (int i = 0; i < 4; i++) {
+            queue.put(new MockFlowFileRecord());
+        }
+
+        assertEquals(2, queue.getPartition(0).size().getObjectCount());
+        assertEquals(2, queue.getPartition(1).size().getObjectCount());
+        assertEquals(0, queue.getPartition(2).size().getObjectCount());
+
+        final Set<NodeIdentifier> updatedNodeIdentifiers = new 
HashSet<>(nodeIds);
+        // Add a Node Identifier with an of ID consisting of a bunch of Z's so 
that the new partition will be Partition Number 3.
+        updatedNodeIdentifiers.add(new NodeIdentifier("ZZZZZZZZZZZZZZ", 
"localhost", nodePort++, "localhost", nodePort++,
+            "localhost", nodePort++, "localhost", nodePort++, nodePort++, 
true, Collections.emptySet()));
+
+        queue.setNodeIdentifiers(updatedNodeIdentifiers, false);
+
+        final int[] expectedPartitionSizes = new int[] {0, 0, 0, 4};
+        final int[] partitionSizes = new int[4];
+        while (!Arrays.equals(expectedPartitionSizes, partitionSizes)) {
+            Thread.sleep(10L);
+
+            for (int i = 0; i < 4; i++) {
+                partitionSizes[i] = 
queue.getPartition(i).size().getObjectCount();
+            }
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void 
testChangeInClusterTopologyTriggersRebalanceOnlyOnRemovedNodeIfNecessary() 
throws InterruptedException {
+        // Create partitioner that sends first 2 FlowFiles to Partition 0, 
next 2 to Partition 1, and then next 4 to Partition 3.
+        queue.setFlowFilePartitioner(new StaticSequencePartitioner(new int[] 
{0, 1, 2, 2, 0, 1}, false));
+
+        for (int i = 0; i < 4; i++) {
+            queue.put(new MockFlowFileRecord());
+        }
+
+        assertEquals(1, queue.getPartition(0).size().getObjectCount());
+        assertEquals(1, queue.getPartition(1).size().getObjectCount());
+        assertEquals(2, queue.getPartition(2).size().getObjectCount());
+
+        final Set<NodeIdentifier> updatedNodeIdentifiers = new HashSet<>();
+        updatedNodeIdentifiers.add(nodeIds.get(0));
+        updatedNodeIdentifiers.add(nodeIds.get(1));
+        queue.setNodeIdentifiers(updatedNodeIdentifiers, false);
+
+        final int[] expectedPartitionSizes = new int[] {2, 2};
+        final int[] partitionSizes = new int[2];
+        while (!Arrays.equals(expectedPartitionSizes, partitionSizes)) {
+            Thread.sleep(10L);
+
+            for (int i = 0; i < 2; i++) {
+                partitionSizes[i] = 
queue.getPartition(i).size().getObjectCount();
+            }
+        }
+    }
+
+    @Test(timeout = 10000)
+    public void testChangeInPartitionerTriggersRebalance() throws 
InterruptedException {
+        // Create partitioner that sends first 2 FlowFiles to Partition 0, 
next 2 to Partition 1, and then next 4 to Partition 3.
+        queue.setFlowFilePartitioner(new StaticSequencePartitioner(new int[] 
{0, 1, 0, 1}, false));
+
+        for (int i = 0; i < 4; i++) {
+            queue.put(new MockFlowFileRecord());
+        }
+
+        assertEquals(2, queue.getPartition(0).size().getObjectCount());
+        assertEquals(2, queue.getPartition(1).size().getObjectCount());
+        assertEquals(0, queue.getPartition(2).size().getObjectCount());
+
+        queue.setFlowFilePartitioner(new StaticSequencePartitioner(new int[] 
{0, 1, 2, 2}, true));
+
+        final int[] expectedPartitionSizes = new int[] {1, 1, 2};
+        assertPartitionSizes(expectedPartitionSizes);
+    }
+
+    @Test(timeout = 100000)
+    public void testLocalNodeIdentifierSet() throws InterruptedException {
+        nodeIds.clear();
+
+        final NodeIdentifier id1 = createNodeIdentifier();
+        final NodeIdentifier id2 = createNodeIdentifier();
+        final NodeIdentifier id3 = createNodeIdentifier();
+        nodeIds.add(id1);
+        nodeIds.add(id2);
+        nodeIds.add(id3);
+
+        when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null);
+
+        final AsyncLoadBalanceClientRegistry registry = 
mock(AsyncLoadBalanceClientRegistry.class);
+        queue = new SocketLoadBalancedFlowFileQueue("unit-test", new 
NopConnectionEventListener(), mock(ProcessScheduler.class), flowFileRepo, 
provRepo,
+                contentRepo, claimManager, clusterCoordinator, registry, 
swapManager, 10000, eventReporter);
+
+        queue.setFlowFilePartitioner(new RoundRobinPartitioner());
+
+        // Queue up data without knowing the local node id.
+        final Map<String, String> attributes = new HashMap<>();
+        for (int i=0; i < 6; i++) {
+            attributes.put("i", String.valueOf(i));
+            queue.put(new MockFlowFileRecord(attributes, 0));
+        }
+
+        for (int i=0; i < 3; i++) {
+            assertEquals(2, queue.getPartition(i).size().getObjectCount());
+        }
+
+        assertEquals(0, queue.getLocalPartition().size().getObjectCount());
+
+        when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(id1);
+        clusterTopologyEventListener.onLocalNodeIdentifierSet(id1);
+
+        assertPartitionSizes(new int[] {2, 2, 2});
+
+        while (queue.getLocalPartition().size().getObjectCount() != 2) {
+            Thread.sleep(10L);
+        }
+    }
+
+    private void assertPartitionSizes(final int[] expectedSizes) {
+        final int[] partitionSizes = new int[queue.getPartitionCount()];
+        while (!Arrays.equals(expectedSizes, partitionSizes)) {
+            try {
+                Thread.sleep(10L);
+            } catch (InterruptedException e) {
+                Assert.fail("Interrupted");
+            }
+
+            for (int i = 0; i < partitionSizes.length; i++) {
+                partitionSizes[i] = 
queue.getPartition(i).size().getObjectCount();
+            }
+        }
+    }
+
+
+    private static class StaticFlowFilePartitioner implements 
FlowFilePartitioner {
+        private final int partitionIndex;
+
+        public StaticFlowFilePartitioner(final int partition) {
+            this.partitionIndex = partition;
+        }
+
+        @Override
+        public QueuePartition getPartition(final FlowFileRecord flowFile, 
final QueuePartition[] partitions, final QueuePartition localPartition) {
+            return partitions[partitionIndex];
+        }
+
+        @Override
+        public boolean isRebalanceOnClusterResize() {
+            return false;
+        }
+
+        @Override
+        public boolean isRebalanceOnFailure() {
+            return false;
+        }
+    }
+
+    private static class FlowFileSizePartitioner implements 
FlowFilePartitioner {
+        @Override
+        public QueuePartition getPartition(final FlowFileRecord flowFile, 
final QueuePartition[] partitions, final QueuePartition localPartition) {
+            return partitions[(int) flowFile.getSize()];
+        }
+
+        @Override
+        public boolean isRebalanceOnClusterResize() {
+            return false;
+        }
+
+        @Override
+        public boolean isRebalanceOnFailure() {
+            return false;
+        }
+    }
+
+    private static class StaticSequencePartitioner implements 
FlowFilePartitioner {
+        private final int[] partitionIndices;
+        private final boolean requireRebalance;
+        private int index = 0;
+
+        public StaticSequencePartitioner(final int[] partitions, final boolean 
requireRebalance) {
+            this.partitionIndices = partitions;
+            this.requireRebalance = requireRebalance;
+        }
+
+        @Override
+        public QueuePartition getPartition(final FlowFileRecord flowFile, 
final QueuePartition[] partitions, final QueuePartition localPartition) {
+            final int partitionIndex = partitionIndices[index++];
+            return partitions[partitionIndex];
+        }
+
+        @Override
+        public boolean isRebalanceOnClusterResize() {
+            return requireRebalance;
+        }
+
+        @Override
+        public boolean isRebalanceOnFailure() {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
new file mode 100644
index 0000000..71ad257
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java
@@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.controller.MockFlowFileRecord;
+import org.apache.nifi.controller.MockSwapManager;
+import org.apache.nifi.controller.queue.DropFlowFileAction;
+import org.apache.nifi.controller.queue.DropFlowFileRequest;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.QueueSize;
+import org.apache.nifi.controller.queue.SwappablePriorityQueue;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.util.MockFlowFile;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestSwappablePriorityQueue {
+
+    private MockSwapManager swapManager;
+    private final EventReporter eventReporter = EventReporter.NO_OP;
+    private final FlowFileQueue flowFileQueue = 
Mockito.mock(FlowFileQueue.class);
+    private final DropFlowFileAction dropAction = (flowFiles, requestor) -> {
+        return new QueueSize(flowFiles.size(), 
flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
+    };
+
+    private SwappablePriorityQueue queue;
+
+    @Before
+    public void setup() {
+        swapManager = new MockSwapManager();
+
+        when(flowFileQueue.getIdentifier()).thenReturn("unit-test");
+        queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, 
flowFileQueue, dropAction, "local");
+    }
+
+
+    @Test
+    public void testPrioritizer() {
+        final FlowFilePrioritizer prioritizer = (o1, o2) -> 
Long.compare(o1.getId(), o2.getId());
+        queue.setPriorities(Collections.singletonList(prioritizer));
+
+        for (int i = 0; i < 5000; i++) {
+            queue.put(new MockFlowFile(i));
+        }
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        for (int i = 0; i < 5000; i++) {
+            final FlowFileRecord polled = queue.poll(expiredRecords, 500000L);
+            assertEquals(i, polled.getId());
+        }
+
+        // We can add flowfiles in reverse order (highest ID first) and we 
should still get the same order back when polling
+        for (int i = 0; i < 5000; i++) {
+            queue.put(new MockFlowFile(5000 - i));
+        }
+        for (int i = 0; i < 5000; i++) {
+            final FlowFileRecord polled = queue.poll(expiredRecords, 500000L);
+            // ID's will start at 1, since the last FlowFile added will have 
ID of 5000 - 4999
+            assertEquals(i + 1, polled.getId());
+        }
+
+        // Add FlowFiles again, then change prioritizer and ensure that the 
order is updated
+        for (int i = 0; i < 5000; i++) {
+            queue.put(new MockFlowFile(i));
+        }
+
+        final FlowFilePrioritizer reversePrioritizer = (o1, o2) -> 
Long.compare(o2.getId(), o1.getId());
+        queue.setPriorities(Collections.singletonList(reversePrioritizer));
+
+        for (int i = 0; i < 5000; i++) {
+            final FlowFileRecord polled = queue.poll(expiredRecords, 500000L);
+            // ID's will start at 4999, since the last FlowFile added will 
have ID of 4999 (i < 5000, not i <= 5000).
+            assertEquals(5000 - i - 1, polled.getId());
+        }
+    }
+
+    @Test
+    public void testPollWithOnlyExpiredFlowFile() {
+        final FlowFileRecord expiredFlowFile = mock(FlowFileRecord.class);
+        
when(expiredFlowFile.getEntryDate()).thenReturn(System.currentTimeMillis() - 
5000L);
+        queue.put(expiredFlowFile);
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        final FlowFileRecord polled = queue.poll(expiredRecords, 4999);
+        assertNull(polled);
+
+        assertEquals(1, expiredRecords.size());
+        final FlowFileRecord expired = expiredRecords.iterator().next();
+        assertSame(expiredFlowFile, expired);
+    }
+
+    @Test
+    public void testPollWithExpiredAndUnexpired() {
+        final SwappablePriorityQueue queue = new 
SwappablePriorityQueue(swapManager, 100, eventReporter, flowFileQueue, 
dropAction, "local");
+
+        final FlowFileRecord expiredFlowFile = mock(FlowFileRecord.class);
+        
when(expiredFlowFile.getEntryDate()).thenReturn(System.currentTimeMillis() - 
5000L);
+        queue.put(expiredFlowFile);
+
+        final FlowFileRecord unexpiredFlowFile = mock(FlowFileRecord.class);
+        
when(unexpiredFlowFile.getEntryDate()).thenReturn(System.currentTimeMillis() + 
500000L);
+        queue.put(unexpiredFlowFile);
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        final FlowFileRecord polled = queue.poll(expiredRecords, 4999);
+        assertSame(unexpiredFlowFile, polled);
+
+        assertEquals(1, expiredRecords.size());
+        final FlowFileRecord expired = expiredRecords.iterator().next();
+        assertSame(expiredFlowFile, expired);
+    }
+
+    @Test
+    public void testEmpty() {
+        assertTrue(queue.isEmpty());
+        assertTrue(queue.isActiveQueueEmpty());
+
+        for (int i = 0; i < 9; i++) {
+            queue.put(new MockFlowFileRecord());
+            assertFalse(queue.isEmpty());
+            assertFalse(queue.isActiveQueueEmpty());
+        }
+
+        queue.put(new MockFlowFileRecord());
+        assertFalse(queue.isEmpty());
+        assertFalse(queue.isActiveQueueEmpty());
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        final FlowFileRecord polled = queue.poll(expiredRecords, 500000);
+        assertNotNull(polled);
+        assertTrue(expiredRecords.isEmpty());
+
+        assertFalse(queue.isEmpty());
+        assertFalse(queue.isActiveQueueEmpty());
+
+        // queue is still full because FlowFile has not yet been acknowledged.
+        queue.acknowledge(polled);
+
+        // FlowFile has been acknowledged; queue should no longer be full.
+        assertFalse(queue.isEmpty());
+        assertFalse(queue.isActiveQueueEmpty());
+    }
+
+    @Test
+    public void testSwapOutOccurs() {
+        for (int i = 0; i < 10000; i++) {
+            queue.put(new MockFlowFileRecord());
+            assertEquals(0, swapManager.swapOutCalledCount);
+            assertEquals(i + 1, queue.size().getObjectCount());
+            assertEquals(i + 1, queue.size().getByteCount());
+        }
+
+        for (int i = 0; i < 9999; i++) {
+            queue.put(new MockFlowFileRecord());
+            assertEquals(0, swapManager.swapOutCalledCount);
+            assertEquals(i + 10001, queue.size().getObjectCount());
+            assertEquals(i + 10001, queue.size().getByteCount());
+        }
+
+        queue.put(new MockFlowFileRecord(1000));
+        assertEquals(1, swapManager.swapOutCalledCount);
+        assertEquals(20000, queue.size().getObjectCount());
+        assertEquals(20999, queue.size().getByteCount());
+
+        assertEquals(10000, 
queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
+    }
+
+    @Test
+    public void testLowestPrioritySwappedOutFirst() {
+        final List<FlowFilePrioritizer> prioritizers = new ArrayList<>();
+        prioritizers.add((o1, o2) -> Long.compare(o1.getSize(), o2.getSize()));
+        queue.setPriorities(prioritizers);
+
+        long maxSize = 20000;
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new MockFlowFileRecord(maxSize - i));
+        }
+
+        assertEquals(1, swapManager.swapOutCalledCount);
+        assertEquals(20000, queue.size().getObjectCount());
+
+        assertEquals(10000, 
queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
+        final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, 
new HashSet<FlowFileRecord>(), 500000);
+        assertEquals(10000, flowFiles.size());
+        for (int i = 0; i < 10000; i++) {
+            assertEquals(i, flowFiles.get(i).getSize());
+        }
+    }
+
+
+    @Test
+    public void testSwapIn() {
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new MockFlowFileRecord());
+        }
+
+        assertEquals(1, swapManager.swappedOut.size());
+        queue.put(new MockFlowFileRecord());
+        assertEquals(1, swapManager.swappedOut.size());
+
+        final Set<FlowFileRecord> exp = new HashSet<>();
+        for (int i = 0; i < 9999; i++) {
+            final FlowFileRecord flowFile = queue.poll(exp, 500000);
+            assertNotNull(flowFile);
+            assertEquals(1, 
queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+            assertEquals(1, 
queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getByteCount());
+
+            queue.acknowledge(Collections.singleton(flowFile));
+            assertEquals(0, 
queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+            assertEquals(0, 
queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getByteCount());
+        }
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(1, 
queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
+        assertNotNull(queue.poll(exp, 500000));
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(0, 
queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
+
+        assertEquals(1, swapManager.swapOutCalledCount);
+
+        assertNotNull(queue.poll(exp, 500000)); // this should trigger a 
swap-in of 10,000 records, and then pull 1 off the top.
+        assertEquals(1, swapManager.swapInCalledCount);
+        assertEquals(9999, 
queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
+
+        assertTrue(swapManager.swappedOut.isEmpty());
+
+        queue.poll(exp, 500000);
+    }
+
+    @Test
+    public void testSwapInWhenThresholdIsLessThanSwapSize() {
+        // create a queue where the swap threshold is less than 10k
+        queue = new SwappablePriorityQueue(swapManager, 1000, eventReporter, 
flowFileQueue, dropAction, null);
+
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new MockFlowFileRecord());
+        }
+
+        assertEquals(1, swapManager.swappedOut.size());
+        queue.put(new MockFlowFileRecord());
+        assertEquals(1, swapManager.swappedOut.size());
+
+        final Set<FlowFileRecord> exp = new HashSet<>();
+
+        // At this point there should be:
+        // 1k flow files in the active queue
+        // 9,001 flow files in the swap queue
+        // 10k flow files swapped to disk
+
+        for (int i = 0; i < 999; i++) { //
+            final FlowFileRecord flowFile = queue.poll(exp, 500000);
+            assertNotNull(flowFile);
+            assertEquals(1, 
queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+            assertEquals(1, 
queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getByteCount());
+
+            queue.acknowledge(Collections.singleton(flowFile));
+            assertEquals(0, 
queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getObjectCount());
+            assertEquals(0, 
queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getByteCount());
+        }
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(1, 
queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
+        assertNotNull(queue.poll(exp, 500000));
+
+        assertEquals(0, swapManager.swapInCalledCount);
+        assertEquals(0, 
queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
+
+        assertEquals(1, swapManager.swapOutCalledCount);
+
+        assertNotNull(queue.poll(exp, 500000)); // this should trigger a 
swap-in of 10,000 records, and then pull 1 off the top.
+        assertEquals(1, swapManager.swapInCalledCount);
+        assertEquals(9999, 
queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
+
+        assertTrue(swapManager.swappedOut.isEmpty());
+
+        queue.poll(exp, 500000);
+    }
+
+    @Test
+    public void testQueueCountsUpdatedWhenIncompleteSwapFile() {
+        for (int i = 1; i <= 20000; i++) {
+            queue.put(new MockFlowFileRecord());
+        }
+
+        assertEquals(20000, queue.size().getObjectCount());
+        assertEquals(20000, queue.size().getByteCount());
+
+        assertEquals(1, swapManager.swappedOut.size());
+
+        // when we swap in, cause an IncompleteSwapFileException to be
+        // thrown and contain only 9,999 of the 10,000 FlowFiles
+        swapManager.enableIncompleteSwapFileException(9999);
+        final Set<FlowFileRecord> expired = Collections.emptySet();
+        FlowFileRecord flowFile;
+
+        for (int i = 0; i < 10000; i++) {
+            flowFile = queue.poll(expired, 500000);
+            assertNotNull(flowFile);
+            queue.acknowledge(Collections.singleton(flowFile));
+        }
+
+        // 10,000 FlowFiles on queue - all swapped out
+        assertEquals(10000, queue.size().getObjectCount());
+        assertEquals(10000, queue.size().getByteCount());
+        assertEquals(1, swapManager.swappedOut.size());
+        assertEquals(0, swapManager.swapInCalledCount);
+
+        // Trigger swap in. This will remove 1 FlowFile from queue, leaving 
9,999 but
+        // on swap in, we will get only 9,999 FlowFiles put onto the queue, 
and the queue size will
+        // be decremented by 10,000 (because the Swap File's header tells us 
that there are 10K
+        // FlowFiles, even though only 9999 are in the swap file)
+        flowFile = queue.poll(expired, 500000);
+        assertNotNull(flowFile);
+        queue.acknowledge(Collections.singleton(flowFile));
+
+        // size should be 9,998 because we lost 1 on Swap In, and then we 
pulled one above.
+        assertEquals(9998, queue.size().getObjectCount());
+        assertEquals(9998, queue.size().getByteCount());
+        assertEquals(0, swapManager.swappedOut.size());
+        assertEquals(1, swapManager.swapInCalledCount);
+
+        for (int i = 0; i < 9998; i++) {
+            flowFile = queue.poll(expired, 500000);
+            assertNotNull("Null FlowFile when i = " + i, flowFile);
+            queue.acknowledge(Collections.singleton(flowFile));
+
+            final QueueSize queueSize = queue.size();
+            assertEquals(9998 - i - 1, queueSize.getObjectCount());
+            assertEquals(9998 - i - 1, queueSize.getByteCount());
+        }
+
+        final QueueSize queueSize = queue.size();
+        assertEquals(0, queueSize.getObjectCount());
+        assertEquals(0L, queueSize.getByteCount());
+
+        flowFile = queue.poll(expired, 500000);
+        assertNull(flowFile);
+    }
+
+    @Test(timeout = 120000)
+    public void testDropSwappedFlowFiles() {
+        for (int i = 1; i <= 30000; i++) {
+            queue.put(new MockFlowFileRecord());
+        }
+
+        assertEquals(2, swapManager.swappedOut.size());
+        final DropFlowFileRequest request = new DropFlowFileRequest("Unit 
Test");
+
+        queue.dropFlowFiles(request, "Unit Test");
+
+        assertEquals(0, queue.size().getObjectCount());
+        assertEquals(0, queue.size().getByteCount());
+        assertEquals(0, swapManager.swappedOut.size());
+        assertEquals(2, swapManager.swapInCalledCount);
+    }
+
+
+    @Test(timeout = 5000)
+    public void testGetActiveFlowFilesReturnsAllActiveFlowFiles() throws 
InterruptedException {
+        for (int i = 0; i < 9999; i++) {
+            queue.put(new MockFlowFileRecord());
+        }
+
+        final List<FlowFileRecord> active = queue.getActiveFlowFiles();
+        assertNotNull(active);
+        assertEquals(9999, active.size());
+    }
+
+
+    @Test(timeout = 5000)
+    public void testListFlowFilesResultsLimited() throws InterruptedException {
+        for (int i = 0; i < 30050; i++) {
+            queue.put(new MockFlowFileRecord());
+        }
+
+        final List<FlowFileRecord> activeFlowFiles = 
queue.getActiveFlowFiles();
+        assertNotNull(activeFlowFiles);
+        assertEquals(10000, activeFlowFiles.size());
+    }
+
+
+    @Test
+    public void testOOMEFollowedBySuccessfulSwapIn() {
+        final List<FlowFileRecord> flowFiles = new ArrayList<>();
+        for (int i = 0; i < 50000; i++) {
+            flowFiles.add(new MockFlowFileRecord());
+        }
+
+        queue.putAll(flowFiles);
+
+        swapManager.failSwapInAfterN = 2;
+        swapManager.setSwapInFailure(new OutOfMemoryError("Intentional OOME 
for unit test"));
+
+        final Set<FlowFileRecord> expiredRecords = new HashSet<>();
+        for (int i = 0; i < 30000; i++) {
+            final FlowFileRecord polled = queue.poll(expiredRecords, 500000);
+            assertNotNull(polled);
+        }
+
+        // verify that unexpected ERROR's are handled in such a way that we 
keep retrying
+        for (int i = 0; i < 3; i++) {
+            try {
+                queue.poll(expiredRecords, 500000);
+                Assert.fail("Expected OOME to be thrown");
+            } catch (final OutOfMemoryError oome) {
+                // expected
+            }
+        }
+
+        // verify that unexpected Runtime Exceptions are handled in such a way 
that we keep retrying
+        swapManager.setSwapInFailure(new NullPointerException("Intentional 
OOME for unit test"));
+
+        for (int i = 0; i < 3; i++) {
+            try {
+                queue.poll(expiredRecords, 500000);
+                Assert.fail("Expected NPE to be thrown");
+            } catch (final NullPointerException npe) {
+                // expected
+            }
+        }
+
+        swapManager.failSwapInAfterN = -1;
+
+        for (int i = 0; i < 20000; i++) {
+            final FlowFileRecord polled = queue.poll(expiredRecords, 500000);
+            assertNotNull(polled);
+        }
+
+        queue.acknowledge(flowFiles);
+        assertNull(queue.poll(expiredRecords, 500000));
+        assertEquals(0, 
queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount());
+        assertEquals(0, queue.size().getObjectCount());
+
+        assertTrue(swapManager.swappedOut.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
new file mode 100644
index 0000000..efa5d73
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.client.async.nio;
+
+import org.apache.nifi.controller.MockFlowFileRecord;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess;
+import org.apache.nifi.controller.queue.clustered.SimpleLimitThreshold;
+import 
org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec;
+import 
org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback;
+import 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.channels.SocketChannel;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestLoadBalanceSession {
+
+    private final TransactionFailureCallback NOP_FAILURE_CALLBACK = new 
TransactionFailureCallback() {
+        @Override
+        public void onTransactionFailed(final List<FlowFileRecord> flowFiles, 
final Exception cause, final TransactionPhase transactionPhase) {
+        }
+
+        @Override
+        public boolean isRebalanceOnFailure() {
+            return false;
+        }
+    };
+
+    private ByteArrayOutputStream received;
+    private ServerSocket serverSocket;
+    private int port;
+
+    @Before
+    public void setup() throws IOException {
+        received = new ByteArrayOutputStream();
+
+        serverSocket = new ServerSocket(0);
+        port = serverSocket.getLocalPort();
+
+        final Thread thread = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try (final Socket socket = serverSocket.accept()) {
+                    final InputStream in = socket.getInputStream();
+                    int data;
+
+                    
socket.getOutputStream().write(LoadBalanceProtocolConstants.VERSION_ACCEPTED);
+                    
socket.getOutputStream().write(LoadBalanceProtocolConstants.SPACE_AVAILABLE);
+                    
socket.getOutputStream().write(LoadBalanceProtocolConstants.CONFIRM_CHECKSUM);
+                    
socket.getOutputStream().write(LoadBalanceProtocolConstants.CONFIRM_COMPLETE_TRANSACTION);
+
+                    while ((data = in.read()) != -1) {
+                        received.write(data);
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    Assert.fail();
+                }
+            }
+        });
+        thread.setDaemon(true);
+        thread.start();
+    }
+
+    @After
+    public void shutdown() throws IOException {
+        serverSocket.close();
+    }
+
+    @Test(timeout = 10000)
+    public void testSunnyCase() throws InterruptedException, IOException {
+        final Queue<FlowFileRecord> flowFiles = new LinkedList<>();
+        final FlowFileRecord flowFile1 = new MockFlowFileRecord(5);
+        final FlowFileRecord flowFile2 = new MockFlowFileRecord(8);
+        flowFiles.offer(flowFile1);
+        flowFiles.offer(flowFile2);
+
+        final Map<FlowFileRecord, InputStream> contentMap = new HashMap<>();
+        contentMap.put(flowFile1, new 
ByteArrayInputStream("hello".getBytes()));
+        contentMap.put(flowFile2, new 
ByteArrayInputStream("good-bye".getBytes()));
+
+        final FlowFileContentAccess contentAccess = contentMap::get;
+
+        final RegisteredPartition partition = new 
RegisteredPartition("unit-test-connection", () -> false,
+            flowFiles::poll, NOP_FAILURE_CALLBACK, (ff) -> {}, () -> 
LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
+
+        final SocketChannel socketChannel = SocketChannel.open(new 
InetSocketAddress("localhost", port));
+
+        socketChannel.configureBlocking(false);
+        final PeerChannel peerChannel = new PeerChannel(socketChannel, null, 
"unit-test");
+        final LoadBalanceSession transaction = new 
LoadBalanceSession(partition, contentAccess, new 
StandardLoadBalanceFlowFileCodec(), peerChannel, 30000,
+            new SimpleLimitThreshold(100, 10_000_000));
+
+        Thread.sleep(100L);
+
+        while (transaction.communicate()) {
+        }
+
+        assertTrue(transaction.isComplete());
+        socketChannel.close();
+
+        final Checksum expectedChecksum = new CRC32();
+        final ByteArrayOutputStream expectedOut = new ByteArrayOutputStream();
+        expectedOut.write(1); // Protocol Version
+
+        final DataOutputStream expectedDos = new DataOutputStream(new 
CheckedOutputStream(expectedOut, expectedChecksum));
+        expectedDos.writeUTF("unit-test-connection");
+
+        expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE);
+        expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
+        expectedDos.writeInt(68); // metadata length
+        expectedDos.writeInt(1); // 1 attribute
+        expectedDos.writeInt(4); // length of attribute
+        expectedDos.write("uuid".getBytes());
+        expectedDos.writeInt(flowFile1.getAttribute("uuid").length());
+        expectedDos.write(flowFile1.getAttribute("uuid").getBytes());
+        expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage 
start date
+        expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
+        expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
+        expectedDos.writeShort(5);
+        expectedDos.write("hello".getBytes());
+        expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
+
+        expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
+        expectedDos.writeInt(68); // metadata length
+        expectedDos.writeInt(1); // 1 attribute
+        expectedDos.writeInt(4); // length of attribute
+        expectedDos.write("uuid".getBytes());
+        expectedDos.writeInt(flowFile2.getAttribute("uuid").length());
+        expectedDos.write(flowFile2.getAttribute("uuid").getBytes());
+        expectedDos.writeLong(flowFile2.getLineageStartDate()); // lineage 
start date
+        expectedDos.writeLong(flowFile2.getEntryDate()); // entry date
+        expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
+        expectedDos.writeShort(8);
+        expectedDos.write("good-bye".getBytes());
+        expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
+
+        expectedDos.write(LoadBalanceProtocolConstants.NO_MORE_FLOWFILES);
+        expectedDos.writeLong(expectedChecksum.getValue());
+        expectedDos.write(LoadBalanceProtocolConstants.COMPLETE_TRANSACTION);
+
+        final byte[] expectedSent = expectedOut.toByteArray();
+
+        while (received.size() < expectedSent.length) {
+            Thread.sleep(10L);
+        }
+        final byte[] dataSent = received.toByteArray();
+
+        assertArrayEquals(expectedSent, dataSent);
+
+        assertEquals(Arrays.asList(flowFile1, flowFile2), 
transaction.getFlowFilesSent());
+    }
+
+
+    @Test(timeout = 10000)
+    public void testLargeContent() throws InterruptedException, IOException {
+        final byte[] content = new byte[66000];
+        for (int i=0; i < 66000; i++) {
+            content[i] = 'A';
+        }
+
+        final Queue<FlowFileRecord> flowFiles = new LinkedList<>();
+        final FlowFileRecord flowFile1 = new 
MockFlowFileRecord(content.length);
+        flowFiles.offer(flowFile1);
+
+        final Map<FlowFileRecord, InputStream> contentMap = new HashMap<>();
+        contentMap.put(flowFile1, new ByteArrayInputStream(content));
+
+        final FlowFileContentAccess contentAccess = contentMap::get;
+
+        final RegisteredPartition partition = new 
RegisteredPartition("unit-test-connection", () -> false,
+            flowFiles::poll, NOP_FAILURE_CALLBACK, (ff) -> {}, () -> 
LoadBalanceCompression.DO_NOT_COMPRESS, () -> true);
+
+        final SocketChannel socketChannel = SocketChannel.open(new 
InetSocketAddress("localhost", port));
+
+        socketChannel.configureBlocking(false);
+        final PeerChannel peerChannel = new PeerChannel(socketChannel, null, 
"unit-test");
+        final LoadBalanceSession transaction = new 
LoadBalanceSession(partition, contentAccess, new 
StandardLoadBalanceFlowFileCodec(), peerChannel, 30000,
+            new SimpleLimitThreshold(100, 10_000_000));
+
+        Thread.sleep(100L);
+
+        while (transaction.communicate()) {
+        }
+
+        socketChannel.close();
+
+        final Checksum expectedChecksum = new CRC32();
+        final ByteArrayOutputStream expectedOut = new ByteArrayOutputStream();
+        expectedOut.write(1); // Protocol Version
+
+        final DataOutputStream expectedDos = new DataOutputStream(new 
CheckedOutputStream(expectedOut, expectedChecksum));
+
+        expectedDos.writeUTF("unit-test-connection");
+
+        expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE);
+        expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES);
+        expectedDos.writeInt(68); // metadata length
+        expectedDos.writeInt(1); // 1 attribute
+        expectedDos.writeInt(4); // length of attribute
+        expectedDos.write("uuid".getBytes());
+        expectedDos.writeInt(flowFile1.getAttribute("uuid").length());
+        expectedDos.write(flowFile1.getAttribute("uuid").getBytes());
+        expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage 
start date
+        expectedDos.writeLong(flowFile1.getEntryDate()); // entry date
+
+        // first data frame
+        expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
+        expectedDos.writeShort(LoadBalanceSession.MAX_DATA_FRAME_SIZE);
+        expectedDos.write(Arrays.copyOfRange(content, 0, 
LoadBalanceSession.MAX_DATA_FRAME_SIZE));
+
+        // second data frame
+        expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS);
+        expectedDos.writeShort(content.length - 
LoadBalanceSession.MAX_DATA_FRAME_SIZE);
+        expectedDos.write(Arrays.copyOfRange(content, 
LoadBalanceSession.MAX_DATA_FRAME_SIZE, content.length));
+        expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME);
+
+        expectedDos.write(LoadBalanceProtocolConstants.NO_MORE_FLOWFILES);
+        expectedDos.writeLong(expectedChecksum.getValue());
+        expectedDos.write(LoadBalanceProtocolConstants.COMPLETE_TRANSACTION);
+
+        final byte[] expectedSent = expectedOut.toByteArray();
+
+        while (received.size() < expectedSent.length) {
+            Thread.sleep(10L);
+        }
+        final byte[] dataSent = received.toByteArray();
+
+        assertArrayEquals(expectedSent, dataSent);
+
+        assertEquals(Arrays.asList(flowFile1), transaction.getFlowFilesSent());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
new file mode 100644
index 0000000..d020c12
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java
@@ -0,0 +1,656 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.queue.clustered.server;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.queue.LoadBalanceCompression;
+import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.controller.repository.FlowFileRecord;
+import org.apache.nifi.controller.repository.FlowFileRepository;
+import org.apache.nifi.controller.repository.RepositoryRecord;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ResourceClaim;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.provenance.ProvenanceRepository;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
+
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_TRANSACTION;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CHECK_SPACE;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.COMPLETE_TRANSACTION;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_CHECKSUM;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_COMPLETE_TRANSACTION;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.MORE_FLOWFILES;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_DATA_FRAME;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_MORE_FLOWFILES;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REJECT_CHECKSUM;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SKIP_SPACE_CHECK;
+import static 
org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SPACE_AVAILABLE;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.when;
+
+public class TestStandardLoadBalanceProtocol {
+    private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> {};
+    private FlowFileRepository flowFileRepo;
+    private ContentRepository contentRepo;
+    private ProvenanceRepository provenanceRepo;
+    private FlowController flowController;
+    private LoadBalancedFlowFileQueue flowFileQueue;
+
+    private List<RepositoryRecord> flowFileRepoUpdateRecords;
+    private List<ProvenanceEventRecord> provRepoUpdateRecords;
+    private List<FlowFileRecord> flowFileQueuePutRecords;
+    private List<FlowFileRecord> flowFileQueueReceiveRecords;
+
+    private ConcurrentMap<ContentClaim, byte[]> claimContents;
+
+
+    @Before
+    public void setup() throws IOException {
+        flowFileQueuePutRecords = new ArrayList<>();
+        flowFileQueueReceiveRecords = new ArrayList<>();
+        flowFileRepoUpdateRecords = new ArrayList<>();
+        provRepoUpdateRecords = new ArrayList<>();
+
+        flowFileRepo = Mockito.mock(FlowFileRepository.class);
+        contentRepo = Mockito.mock(ContentRepository.class);
+        provenanceRepo = Mockito.mock(ProvenanceRepository.class);
+        flowController = Mockito.mock(FlowController.class);
+        claimContents = new ConcurrentHashMap<>();
+
+        Mockito.doAnswer(new Answer<ContentClaim>() {
+            @Override
+            public ContentClaim answer(final InvocationOnMock invocation) 
throws Throwable {
+                final ContentClaim contentClaim = 
Mockito.mock(ContentClaim.class);
+                final ResourceClaim resourceClaim = 
Mockito.mock(ResourceClaim.class);
+                
when(contentClaim.getResourceClaim()).thenReturn(resourceClaim);
+                return contentClaim;
+            }
+        }).when(contentRepo).create(Mockito.anyBoolean());
+
+        Mockito.doAnswer(new Answer<OutputStream>() {
+            @Override
+            public OutputStream answer(final InvocationOnMock invocation) 
throws Throwable {
+                final ContentClaim contentClaim = invocation.getArgumentAt(0, 
ContentClaim.class);
+
+                final ByteArrayOutputStream baos = new ByteArrayOutputStream() 
{
+                    @Override
+                    public void close() throws IOException {
+                        super.close();
+                        claimContents.put(contentClaim, toByteArray());
+                    }
+                };
+
+                return baos;
+            }
+        }).when(contentRepo).write(Mockito.any(ContentClaim.class));
+
+        final Connection connection = Mockito.mock(Connection.class);
+        
when(flowController.getConnection(Mockito.anyString())).thenReturn(connection);
+
+        flowFileQueue = Mockito.mock(LoadBalancedFlowFileQueue.class);
+        
when(flowFileQueue.getLoadBalanceCompression()).thenReturn(LoadBalanceCompression.DO_NOT_COMPRESS);
+        when(connection.getFlowFileQueue()).thenReturn(flowFileQueue);
+
+        Mockito.doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(final InvocationOnMock invocation) throws 
Throwable {
+                flowFileQueuePutRecords.addAll(invocation.getArgumentAt(0, 
Collection.class));
+                return null;
+            }
+        }).when(flowFileQueue).putAll(anyCollection());
+
+        Mockito.doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(final InvocationOnMock invocation) throws 
Throwable {
+                flowFileQueueReceiveRecords.addAll(invocation.getArgumentAt(0, 
Collection.class));
+                return null;
+            }
+        }).when(flowFileQueue).receiveFromPeer(anyCollection());
+
+        Mockito.doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(final InvocationOnMock invocation) throws 
Throwable {
+                flowFileRepoUpdateRecords.addAll(invocation.getArgumentAt(0, 
Collection.class));
+                return null;
+            }
+        }).when(flowFileRepo).updateRepository(anyCollection());
+
+        Mockito.doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(final InvocationOnMock invocation) throws 
Throwable {
+                provRepoUpdateRecords.addAll(invocation.getArgumentAt(0, 
Collection.class));
+                return null;
+            }
+        }).when(provenanceRepo).registerEvents(anyCollection());
+    }
+
+
+    @Test
+    public void testSimpleFlowFileTransaction() throws IOException {
+        final StandardLoadBalanceProtocol protocol = new 
StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, 
flowController, ALWAYS_AUTHORIZED);
+
+        final PipedInputStream serverInput = new PipedInputStream();
+        final PipedOutputStream serverContentSource = new PipedOutputStream();
+        serverInput.connect(serverContentSource);
+
+        final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream();
+
+        // Write connection ID
+        final Checksum checksum = new CRC32();
+        final OutputStream checkedOutput = new 
CheckedOutputStream(serverContentSource, checksum);
+        final DataOutputStream dos = new DataOutputStream(checkedOutput);
+        dos.writeUTF("unit-test-connection-id");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("a", "A");
+        attributes.put("uuid", "unit-test-id");
+        attributes.put("b", "B");
+
+        dos.write(CHECK_SPACE);
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(attributes, dos);
+        writeContent("hello".getBytes(), dos);
+        dos.write(NO_MORE_FLOWFILES);
+
+        dos.writeLong(checksum.getValue());
+        dos.write(COMPLETE_TRANSACTION);
+
+        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, 
"unit.test");
+
+        final byte[] serverResponse = serverOutput.toByteArray();
+        assertEquals(3, serverResponse.length);
+        assertEquals(SPACE_AVAILABLE, serverResponse[0]);
+        assertEquals(CONFIRM_CHECKSUM, serverResponse[1]);
+        assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[2]);
+
+        assertEquals(1, claimContents.size());
+        final byte[] firstFlowFileContent = 
claimContents.values().iterator().next();
+        assertArrayEquals("hello".getBytes(), firstFlowFileContent);
+
+        Mockito.verify(flowFileRepo, 
times(1)).updateRepository(anyCollection());
+        Mockito.verify(provenanceRepo, times(1)).registerEvents(anyList());
+        Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
+        Mockito.verify(flowFileQueue, 
times(1)).receiveFromPeer(anyCollection());
+    }
+
+    @Test
+    public void testMultipleFlowFiles() throws IOException {
+        final StandardLoadBalanceProtocol protocol = new 
StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, 
flowController, ALWAYS_AUTHORIZED);
+
+        final PipedInputStream serverInput = new PipedInputStream();
+        final PipedOutputStream serverContentSource = new PipedOutputStream();
+        serverInput.connect(serverContentSource);
+
+        final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream();
+
+        // Write connection ID
+        final Checksum checksum = new CRC32();
+        final OutputStream checkedOutput = new 
CheckedOutputStream(serverContentSource, checksum);
+        final DataOutputStream dos = new DataOutputStream(checkedOutput);
+        dos.writeUTF("unit-test-connection-id");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("a", "A");
+        attributes.put("uuid", "unit-test-id");
+        attributes.put("b", "B");
+
+        // Send 4 FlowFiles.
+        dos.write(CHECK_SPACE);
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(attributes, dos);
+        writeContent("hello".getBytes(), dos);
+
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(Collections.singletonMap("uuid", "unit-test-id-2"), 
dos);
+        writeContent(null, dos);
+
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(Collections.singletonMap("uuid", "unit-test-id-3"), 
dos);
+        writeContent("greetings".getBytes(), dos);
+
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(Collections.singletonMap("uuid", "unit-test-id-4"), 
dos);
+        writeContent(new byte[0], dos);
+
+        dos.write(NO_MORE_FLOWFILES);
+
+        dos.writeLong(checksum.getValue());
+        dos.write(COMPLETE_TRANSACTION);
+
+        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, 
"unit.test");
+
+        final byte[] serverResponse = serverOutput.toByteArray();
+        assertEquals(3, serverResponse.length);
+        assertEquals(SPACE_AVAILABLE, serverResponse[0]);
+        assertEquals(CONFIRM_CHECKSUM, serverResponse[1]);
+        assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[2]);
+
+        assertEquals(1, claimContents.size());
+        final byte[] bytes = claimContents.values().iterator().next();
+        assertTrue(Arrays.equals("hellogreetings".getBytes(), bytes) || 
Arrays.equals("greetingshello".getBytes(), bytes));
+
+        assertEquals(4, flowFileRepoUpdateRecords.size());
+        assertEquals(4, provRepoUpdateRecords.size());
+        assertEquals(0, flowFileQueuePutRecords.size());
+        assertEquals(4, flowFileQueueReceiveRecords.size());
+
+        assertTrue(provRepoUpdateRecords.stream().allMatch(event -> 
event.getEventType() == ProvenanceEventType.RECEIVE));
+    }
+
+
+    @Test
+    public void testMultipleFlowFilesWithoutCheckingSpace() throws IOException 
{
+        final StandardLoadBalanceProtocol protocol = new 
StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, 
flowController, ALWAYS_AUTHORIZED);
+
+        final PipedInputStream serverInput = new PipedInputStream();
+        final PipedOutputStream serverContentSource = new PipedOutputStream();
+        serverInput.connect(serverContentSource);
+
+        final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream();
+
+        // Write connection ID
+        final Checksum checksum = new CRC32();
+        final OutputStream checkedOutput = new 
CheckedOutputStream(serverContentSource, checksum);
+        final DataOutputStream dos = new DataOutputStream(checkedOutput);
+        dos.writeUTF("unit-test-connection-id");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("a", "A");
+        attributes.put("uuid", "unit-test-id");
+        attributes.put("b", "B");
+
+        // Send 4 FlowFiles.
+        dos.write(SKIP_SPACE_CHECK);
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(attributes, dos);
+        writeContent("hello".getBytes(), dos);
+
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(Collections.singletonMap("uuid", "unit-test-id-2"), 
dos);
+        writeContent(null, dos);
+
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(Collections.singletonMap("uuid", "unit-test-id-3"), 
dos);
+        writeContent("greetings".getBytes(), dos);
+
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(Collections.singletonMap("uuid", "unit-test-id-4"), 
dos);
+        writeContent(new byte[0], dos);
+
+        dos.write(NO_MORE_FLOWFILES);
+
+        dos.writeLong(checksum.getValue());
+        dos.write(COMPLETE_TRANSACTION);
+
+        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, 
"unit.test");
+
+        final byte[] serverResponse = serverOutput.toByteArray();
+        assertEquals(2, serverResponse.length);
+        assertEquals(CONFIRM_CHECKSUM, serverResponse[0]);
+        assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[1]);
+
+        assertEquals(1, claimContents.size());
+        final byte[] bytes = claimContents.values().iterator().next();
+        assertTrue(Arrays.equals("hellogreetings".getBytes(), bytes) || 
Arrays.equals("greetingshello".getBytes(), bytes));
+
+        assertEquals(4, flowFileRepoUpdateRecords.size());
+        assertEquals(4, provRepoUpdateRecords.size());
+        assertEquals(0, flowFileQueuePutRecords.size());
+        assertEquals(4, flowFileQueueReceiveRecords.size());
+
+        assertTrue(provRepoUpdateRecords.stream().allMatch(event -> 
event.getEventType() == ProvenanceEventType.RECEIVE));
+    }
+
+    @Test
+    public void testEofExceptionMultipleFlowFiles() throws IOException {
+        final StandardLoadBalanceProtocol protocol = new 
StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, 
flowController, ALWAYS_AUTHORIZED);
+
+        final PipedInputStream serverInput = new PipedInputStream();
+        final PipedOutputStream serverContentSource = new PipedOutputStream();
+        serverInput.connect(serverContentSource);
+
+        final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream();
+
+        // Write connection ID
+        final Checksum checksum = new CRC32();
+        final OutputStream checkedOutput = new 
CheckedOutputStream(serverContentSource, checksum);
+        final DataOutputStream dos = new DataOutputStream(checkedOutput);
+        dos.writeUTF("unit-test-connection-id");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("a", "A");
+        attributes.put("uuid", "unit-test-id");
+        attributes.put("b", "B");
+
+        // Send 4 FlowFiles.
+        dos.write(CHECK_SPACE);
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(attributes, dos);
+        writeContent("hello".getBytes(), dos);
+
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(Collections.singletonMap("uuid", "unit-test-id-2"), 
dos);
+        writeContent(null, dos);
+
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(Collections.singletonMap("uuid", "unit-test-id-3"), 
dos);
+        writeContent("greetings".getBytes(), dos);
+
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(Collections.singletonMap("uuid", "unit-test-id-4"), 
dos);
+        writeContent(new byte[0], dos);
+
+        dos.flush();
+        dos.close();
+
+        try {
+            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1, "unit.test");
+            Assert.fail("Expected EOFException but none was thrown");
+        } catch (final EOFException eof) {
+            // expected
+        }
+
+        final byte[] serverResponse = serverOutput.toByteArray();
+        assertEquals(1, serverResponse.length);
+        assertEquals(SPACE_AVAILABLE, serverResponse[0]);
+
+        assertEquals(1, claimContents.size());
+        assertArrayEquals("hellogreetings".getBytes(), 
claimContents.values().iterator().next());
+
+        assertEquals(0, flowFileRepoUpdateRecords.size());
+        assertEquals(0, provRepoUpdateRecords.size());
+        assertEquals(0, flowFileQueuePutRecords.size());
+    }
+
+    @Test
+    public void testBadChecksum() throws IOException {
+        final StandardLoadBalanceProtocol protocol = new 
StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, 
flowController, ALWAYS_AUTHORIZED);
+
+        final PipedInputStream serverInput = new PipedInputStream();
+        final PipedOutputStream serverContentSource = new PipedOutputStream();
+        serverInput.connect(serverContentSource);
+
+        final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream();
+
+        // Write connection ID
+        final Checksum checksum = new CRC32();
+        final OutputStream checkedOutput = new 
CheckedOutputStream(serverContentSource, checksum);
+        final DataOutputStream dos = new DataOutputStream(checkedOutput);
+        dos.writeUTF("unit-test-connection-id");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("uuid", "unit-test-id");
+
+        dos.write(CHECK_SPACE);
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(attributes, dos);
+        writeContent("hello".getBytes(), dos);
+        dos.write(NO_MORE_FLOWFILES);
+
+        dos.writeLong(1L); // Write bad checksum.
+        dos.write(COMPLETE_TRANSACTION);
+
+        try {
+            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1, "unit.test");
+            Assert.fail("Expected TransactionAbortedException but none was 
thrown");
+        } catch (final TransactionAbortedException e) {
+            // expected
+        }
+
+        final byte[] serverResponse = serverOutput.toByteArray();
+        assertEquals(2, serverResponse.length);
+        assertEquals(SPACE_AVAILABLE, serverResponse[0]);
+        assertEquals(REJECT_CHECKSUM, serverResponse[1]);
+
+        assertEquals(1, claimContents.size());
+        final byte[] firstFlowFileContent = 
claimContents.values().iterator().next();
+        assertArrayEquals("hello".getBytes(), firstFlowFileContent);
+
+        Mockito.verify(flowFileRepo, 
times(0)).updateRepository(anyCollection());
+        Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
+        Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
+        Mockito.verify(contentRepo, 
times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
+    }
+
+    @Test
+    public void testEofWritingContent() throws IOException {
+        final StandardLoadBalanceProtocol protocol = new 
StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, 
flowController, ALWAYS_AUTHORIZED);
+
+        final PipedInputStream serverInput = new PipedInputStream();
+        final PipedOutputStream serverContentSource = new PipedOutputStream();
+        serverInput.connect(serverContentSource);
+
+        final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream();
+
+        // Write connection ID
+        final Checksum checksum = new CRC32();
+        final OutputStream checkedOutput = new 
CheckedOutputStream(serverContentSource, checksum);
+        final DataOutputStream dos = new DataOutputStream(checkedOutput);
+        dos.writeUTF("unit-test-connection-id");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("uuid", "unit-test-id");
+
+        dos.write(CHECK_SPACE);
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(attributes, dos);
+
+        // Indicate 45 byte data frame, then stop after 5 bytes.
+        dos.write(DATA_FRAME_FOLLOWS);
+        dos.writeShort(45);
+        dos.write("hello".getBytes());
+        dos.flush();
+        dos.close();
+
+        try {
+            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1, "unit.test");
+            Assert.fail("Expected EOFException but none was thrown");
+        } catch (final EOFException e) {
+            // expected
+        }
+
+        final byte[] serverResponse = serverOutput.toByteArray();
+        assertEquals(1, serverResponse.length);
+        assertEquals(SPACE_AVAILABLE, serverResponse[0]);
+
+        assertEquals(1, claimContents.size());
+        final byte[] firstFlowFileContent = 
claimContents.values().iterator().next();
+        assertArrayEquals(new byte[0], firstFlowFileContent);
+
+        Mockito.verify(flowFileRepo, 
times(0)).updateRepository(anyCollection());
+        Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
+        Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
+        Mockito.verify(contentRepo, 
times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
+    }
+
+    @Test
+    public void testAbortAfterChecksumConfirmation() throws IOException {
+        final StandardLoadBalanceProtocol protocol = new 
StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, 
flowController, ALWAYS_AUTHORIZED);
+
+        final PipedInputStream serverInput = new PipedInputStream();
+        final PipedOutputStream serverContentSource = new PipedOutputStream();
+        serverInput.connect(serverContentSource);
+
+        final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream();
+
+        // Write connection ID
+        final Checksum checksum = new CRC32();
+        final OutputStream checkedOutput = new 
CheckedOutputStream(serverContentSource, checksum);
+        final DataOutputStream dos = new DataOutputStream(checkedOutput);
+        dos.writeUTF("unit-test-connection-id");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("uuid", "unit-test-id");
+
+        dos.write(CHECK_SPACE);
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(attributes, dos);
+        writeContent("hello".getBytes(), dos);
+        dos.write(NO_MORE_FLOWFILES);
+
+        dos.writeLong(checksum.getValue());
+        dos.write(ABORT_TRANSACTION);
+
+        try {
+            protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 
1, "unit.test");
+            Assert.fail("Expected TransactionAbortedException but none was 
thrown");
+        } catch (final TransactionAbortedException e) {
+            // expected
+        }
+
+        final byte[] serverResponse = serverOutput.toByteArray();
+        assertEquals(2, serverResponse.length);
+        assertEquals(SPACE_AVAILABLE, serverResponse[0]);
+        assertEquals(CONFIRM_CHECKSUM, serverResponse[1]);
+
+        assertEquals(1, claimContents.size());
+        final byte[] firstFlowFileContent = 
claimContents.values().iterator().next();
+        assertArrayEquals("hello".getBytes(), firstFlowFileContent);
+
+        Mockito.verify(flowFileRepo, 
times(0)).updateRepository(anyCollection());
+        Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList());
+        Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
+        Mockito.verify(contentRepo, 
times(1)).decrementClaimantCount(claimContents.keySet().iterator().next());
+    }
+
+    @Test
+    public void testFlowFileNoContent() throws IOException {
+        final StandardLoadBalanceProtocol protocol = new 
StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, 
flowController, ALWAYS_AUTHORIZED);
+
+        final PipedInputStream serverInput = new PipedInputStream();
+        final PipedOutputStream serverContentSource = new PipedOutputStream();
+        serverInput.connect(serverContentSource);
+
+        final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream();
+
+        // Write connection ID
+        final Checksum checksum = new CRC32();
+        final OutputStream checkedOutput = new 
CheckedOutputStream(serverContentSource, checksum);
+        final DataOutputStream dos = new DataOutputStream(checkedOutput);
+        dos.writeUTF("unit-test-connection-id");
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("uuid", "unit-test-id");
+
+        dos.write(CHECK_SPACE);
+        dos.write(MORE_FLOWFILES);
+        writeAttributes(attributes, dos);
+        writeContent(null, dos);
+        dos.write(NO_MORE_FLOWFILES);
+
+        dos.writeLong(checksum.getValue());
+        dos.write(COMPLETE_TRANSACTION);
+
+        protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, 
"unit.test");
+
+        final byte[] serverResponse = serverOutput.toByteArray();
+        assertEquals(3, serverResponse.length);
+        assertEquals(SPACE_AVAILABLE, serverResponse[0]);
+        assertEquals(CONFIRM_CHECKSUM, serverResponse[1]);
+        assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[2]);
+
+        assertEquals(1, claimContents.size());
+        assertEquals(0, claimContents.values().iterator().next().length);
+
+        Mockito.verify(flowFileRepo, 
times(1)).updateRepository(anyCollection());
+        Mockito.verify(provenanceRepo, times(1)).registerEvents(anyList());
+        Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection());
+        Mockito.verify(flowFileQueue, 
times(1)).receiveFromPeer(anyCollection());
+    }
+
+    private void writeAttributes(final Map<String, String> attributes, final 
DataOutputStream dos) throws IOException {
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(baos)) {
+            out.writeInt(attributes.size());
+
+            for (final Map.Entry<String, String> entry : 
attributes.entrySet()) {
+                final byte[] key = entry.getKey().getBytes();
+                out.writeInt(key.length);
+                out.write(key);
+
+                final byte[] value = entry.getValue().getBytes();
+                out.writeInt(value.length);
+                out.write(value);
+            }
+
+            out.writeLong(0L); // lineage start date
+            out.writeLong(0L); // entry date
+
+            dos.writeInt(baos.size());
+            baos.writeTo(dos);
+        }
+
+    }
+
+    private void writeContent(final byte[] content, final DataOutputStream 
out) throws IOException {
+        if (content == null) {
+            out.write(NO_DATA_FRAME);
+            return;
+        }
+
+        int iterations = content.length / 65535;
+        if (content.length % 65535 > 0) {
+            iterations++;
+        }
+
+        for (int i=0; i < iterations; i++) {
+            final int offset = i * 65536;
+            final int length = Math.min(content.length - offset, 65535);
+
+            out.write(DATA_FRAME_FOLLOWS);
+            out.writeShort(length);
+            out.write(content, offset, length);
+        }
+
+        out.write(NO_DATA_FRAME);
+    }
+}

Reply via email to