http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java new file mode 100644 index 0000000..971770a --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSocketLoadBalancedFlowFileQueue.java @@ -0,0 +1,514 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered; + +import org.apache.nifi.cluster.coordination.ClusterCoordinator; +import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.MockFlowFileRecord; +import org.apache.nifi.controller.MockSwapManager; +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.controller.queue.NopConnectionEventListener; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry; +import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner; +import org.apache.nifi.controller.queue.clustered.partition.QueuePartition; +import org.apache.nifi.controller.queue.clustered.partition.RoundRobinPartitioner; +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.SwapSummary; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.provenance.ProvenanceEventRepository; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestSocketLoadBalancedFlowFileQueue { + + private Connection connection; + private FlowFileRepository flowFileRepo; + private ContentRepository contentRepo; + private ProvenanceEventRepository provRepo; + private ResourceClaimManager claimManager; + private ClusterCoordinator clusterCoordinator; + private MockSwapManager swapManager; + private EventReporter eventReporter; + private SocketLoadBalancedFlowFileQueue queue; + private volatile ClusterTopologyEventListener clusterTopologyEventListener; + + private List<NodeIdentifier> nodeIds; + private int nodePort = 4096; + + @Before + public void setup() { + MockFlowFileRecord.resetIdGenerator(); + connection = mock(Connection.class); + when(connection.getIdentifier()).thenReturn("unit-test"); + + flowFileRepo = mock(FlowFileRepository.class); + contentRepo = mock(ContentRepository.class); + provRepo = mock(ProvenanceEventRepository.class); + claimManager = new StandardResourceClaimManager(); + clusterCoordinator = mock(ClusterCoordinator.class); + swapManager = new MockSwapManager(); + eventReporter = EventReporter.NO_OP; + + final NodeIdentifier localNodeIdentifier = createNodeIdentifier(); + + nodeIds = new ArrayList<>(); + nodeIds.add(localNodeIdentifier); + nodeIds.add(createNodeIdentifier()); + nodeIds.add(createNodeIdentifier()); + + Mockito.doAnswer(new Answer<Set<NodeIdentifier>>() { + @Override + public Set<NodeIdentifier> answer(InvocationOnMock invocation) throws Throwable { + return new HashSet<>(nodeIds); + } + }).when(clusterCoordinator).getNodeIdentifiers(); + + when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(localNodeIdentifier); + + doAnswer(new Answer() { + @Override + public Object answer(final InvocationOnMock invocation) throws Throwable { + clusterTopologyEventListener = invocation.getArgumentAt(0, ClusterTopologyEventListener.class); + return null; + } + }).when(clusterCoordinator).registerEventListener(Mockito.any(ClusterTopologyEventListener.class)); + + final ProcessScheduler scheduler = mock(ProcessScheduler.class); + + final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class); + queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), scheduler, flowFileRepo, provRepo, + contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter); + } + + private NodeIdentifier createNodeIdentifier() { + return new NodeIdentifier(UUID.randomUUID().toString(), "localhost", nodePort++, "localhost", nodePort++, + "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet()); + } + + @Test + public void testBinsAccordingToPartitioner() { + final FlowFilePartitioner partitioner = new StaticFlowFilePartitioner(1); + queue.setFlowFilePartitioner(partitioner); + + final QueuePartition desiredPartition = queue.getPartition(1); + for (int i = 0; i < 100; i++) { + final MockFlowFileRecord flowFile = new MockFlowFileRecord(0L); + final QueuePartition partition = queue.putAndGetPartition(flowFile); + assertSame(desiredPartition, partition); + } + } + + @Test + public void testPutAllBinsFlowFilesSeparately() { + // Partition data based on size. FlowFiles with 0 bytes will go to partition 0 (local partition), + // FlowFiles with 1 byte will go to partition 1, and FlowFiles with 2 bytes will go to partition 2. + final FlowFilePartitioner partitioner = new FlowFileSizePartitioner(); + queue.setFlowFilePartitioner(partitioner); + + // Add 3 FlowFiles for each size + final List<FlowFileRecord> flowFiles = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + flowFiles.add(new MockFlowFileRecord(0)); + flowFiles.add(new MockFlowFileRecord(1)); + flowFiles.add(new MockFlowFileRecord(2)); + } + + final Map<QueuePartition, List<FlowFileRecord>> partitionMap = queue.putAllAndGetPartitions(flowFiles); + assertEquals(3, partitionMap.size()); + + // For each partition, get the List of FlowFiles added to it, then verify that there are 3 FlowFiles with that size. + for (int i = 0; i < 3; i++) { + final QueuePartition partition = queue.getPartition(i); + final List<FlowFileRecord> flowFilesForPartition = partitionMap.get(partition); + assertNotNull(flowFilesForPartition); + assertEquals(3, flowFilesForPartition.size()); + + for (final FlowFileRecord flowFile : flowFilesForPartition) { + assertEquals(i, flowFile.getSize()); + } + } + } + + private int determineRemotePartitionIndex() { + final QueuePartition localPartition = queue.getLocalPartition(); + if (queue.getPartition(0) == localPartition) { + return 1; + } else { + return 0; + } + } + + private int determineLocalPartitionIndex() { + final QueuePartition localPartition = queue.getLocalPartition(); + for (int i=0; i < clusterCoordinator.getNodeIdentifiers().size(); i++) { + if (queue.getPartition(i) == localPartition) { + return i; + } + } + + throw new IllegalStateException("Could not determine local partition index"); + } + + @Test + public void testIsEmptyWhenFlowFileInRemotePartition() { + queue.setFlowFilePartitioner(new StaticFlowFilePartitioner(determineRemotePartitionIndex())); + + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + assertEquals(new QueueSize(0, 0L), queue.size()); + + queue.put(new MockFlowFileRecord(0L)); + assertFalse(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + assertEquals(new QueueSize(1, 0L), queue.size()); + + assertNull(queue.poll(new HashSet<>())); + assertFalse(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + assertEquals(new QueueSize(1, 0L), queue.size()); + } + + @Test + public void testIsEmptyWhenFlowFileInLocalPartition() { + queue.setFlowFilePartitioner(new StaticFlowFilePartitioner(determineLocalPartitionIndex())); + + // Ensure queue is empty + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + assertEquals(new QueueSize(0, 0L), queue.size()); + + // add a flowfile + final FlowFileRecord flowFile = new MockFlowFileRecord(0L); + queue.put(flowFile); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + assertEquals(new QueueSize(1, 0L), queue.size()); + + // Ensure that we get the same FlowFile back. This will not decrement + // the queue size, only acknowledging the FlowFile will do that. + assertSame(flowFile, queue.poll(new HashSet<>())); + assertFalse(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + assertEquals(new QueueSize(1, 0L), queue.size()); + + // Acknowledging FlowFile should reduce queue size + queue.acknowledge(flowFile); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + assertEquals(new QueueSize(0, 0L), queue.size()); + + // Add FlowFile back in, poll it to ensure that we get it back, and + // then acknowledge as a Collection and ensure the correct sizes. + queue.put(flowFile); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + assertEquals(new QueueSize(1, 0L), queue.size()); + + assertSame(flowFile, queue.poll(new HashSet<>())); + assertFalse(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + assertEquals(new QueueSize(1, 0L), queue.size()); + + queue.acknowledge(Collections.singleton(flowFile)); + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + assertEquals(new QueueSize(0, 0L), queue.size()); + } + + @Test + public void testGetFlowFile() throws IOException { + queue.setFlowFilePartitioner(new FlowFileSizePartitioner()); + + final Map<String, String> localAttributes = Collections.singletonMap("uuid", "local"); + final MockFlowFileRecord localFlowFile = new MockFlowFileRecord(localAttributes, determineLocalPartitionIndex()); + + final Map<String, String> remoteAttributes = Collections.singletonMap("uuid", "remote"); + final MockFlowFileRecord remoteFlowFile = new MockFlowFileRecord(remoteAttributes, determineRemotePartitionIndex()); + + queue.put(localFlowFile); + queue.put(remoteFlowFile); + + assertSame(localFlowFile, queue.getFlowFile("local")); + assertNull(queue.getFlowFile("remote")); + assertNull(queue.getFlowFile("other")); + } + + + @Test + public void testRecoverSwapFiles() throws IOException { + for (int partitionIndex = 0; partitionIndex < 3; partitionIndex++) { + final String partitionName = queue.getPartition(partitionIndex).getSwapPartitionName(); + + final List<FlowFileRecord> flowFiles = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + flowFiles.add(new MockFlowFileRecord(100L)); + } + + swapManager.swapOut(flowFiles, queue, partitionName); + } + + final List<FlowFileRecord> flowFiles = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + flowFiles.add(new MockFlowFileRecord(100L)); + } + + swapManager.swapOut(flowFiles, queue, "other-partition"); + + final SwapSummary swapSummary = queue.recoverSwappedFlowFiles(); + assertEquals(399L, swapSummary.getMaxFlowFileId().longValue()); + assertEquals(400, swapSummary.getQueueSize().getObjectCount()); + assertEquals(400 * 100L, swapSummary.getQueueSize().getByteCount()); + } + + + @Test(timeout = 10000) + public void testChangeInClusterTopologyTriggersRebalance() throws InterruptedException { + // Create partitioner that sends first 2 FlowFiles to Partition 0, next 2 to Partition 1, and then next 4 to Partition 3. + queue.setFlowFilePartitioner(new StaticSequencePartitioner(new int[] {0, 0, 1, 1, 3, 3, 3, 3}, true)); + + for (int i = 0; i < 4; i++) { + queue.put(new MockFlowFileRecord()); + } + + assertEquals(2, queue.getPartition(0).size().getObjectCount()); + assertEquals(2, queue.getPartition(1).size().getObjectCount()); + assertEquals(0, queue.getPartition(2).size().getObjectCount()); + + final Set<NodeIdentifier> updatedNodeIdentifiers = new HashSet<>(nodeIds); + // Add a Node Identifier with an of ID consisting of a bunch of Z's so that the new partition will be Partition Number 3. + updatedNodeIdentifiers.add(new NodeIdentifier("ZZZZZZZZZZZZZZ", "localhost", nodePort++, "localhost", nodePort++, + "localhost", nodePort++, "localhost", nodePort++, nodePort++, true, Collections.emptySet())); + + queue.setNodeIdentifiers(updatedNodeIdentifiers, false); + + final int[] expectedPartitionSizes = new int[] {0, 0, 0, 4}; + final int[] partitionSizes = new int[4]; + while (!Arrays.equals(expectedPartitionSizes, partitionSizes)) { + Thread.sleep(10L); + + for (int i = 0; i < 4; i++) { + partitionSizes[i] = queue.getPartition(i).size().getObjectCount(); + } + } + } + + @Test(timeout = 10000) + public void testChangeInClusterTopologyTriggersRebalanceOnlyOnRemovedNodeIfNecessary() throws InterruptedException { + // Create partitioner that sends first 2 FlowFiles to Partition 0, next 2 to Partition 1, and then next 4 to Partition 3. + queue.setFlowFilePartitioner(new StaticSequencePartitioner(new int[] {0, 1, 2, 2, 0, 1}, false)); + + for (int i = 0; i < 4; i++) { + queue.put(new MockFlowFileRecord()); + } + + assertEquals(1, queue.getPartition(0).size().getObjectCount()); + assertEquals(1, queue.getPartition(1).size().getObjectCount()); + assertEquals(2, queue.getPartition(2).size().getObjectCount()); + + final Set<NodeIdentifier> updatedNodeIdentifiers = new HashSet<>(); + updatedNodeIdentifiers.add(nodeIds.get(0)); + updatedNodeIdentifiers.add(nodeIds.get(1)); + queue.setNodeIdentifiers(updatedNodeIdentifiers, false); + + final int[] expectedPartitionSizes = new int[] {2, 2}; + final int[] partitionSizes = new int[2]; + while (!Arrays.equals(expectedPartitionSizes, partitionSizes)) { + Thread.sleep(10L); + + for (int i = 0; i < 2; i++) { + partitionSizes[i] = queue.getPartition(i).size().getObjectCount(); + } + } + } + + @Test(timeout = 10000) + public void testChangeInPartitionerTriggersRebalance() throws InterruptedException { + // Create partitioner that sends first 2 FlowFiles to Partition 0, next 2 to Partition 1, and then next 4 to Partition 3. + queue.setFlowFilePartitioner(new StaticSequencePartitioner(new int[] {0, 1, 0, 1}, false)); + + for (int i = 0; i < 4; i++) { + queue.put(new MockFlowFileRecord()); + } + + assertEquals(2, queue.getPartition(0).size().getObjectCount()); + assertEquals(2, queue.getPartition(1).size().getObjectCount()); + assertEquals(0, queue.getPartition(2).size().getObjectCount()); + + queue.setFlowFilePartitioner(new StaticSequencePartitioner(new int[] {0, 1, 2, 2}, true)); + + final int[] expectedPartitionSizes = new int[] {1, 1, 2}; + assertPartitionSizes(expectedPartitionSizes); + } + + @Test(timeout = 100000) + public void testLocalNodeIdentifierSet() throws InterruptedException { + nodeIds.clear(); + + final NodeIdentifier id1 = createNodeIdentifier(); + final NodeIdentifier id2 = createNodeIdentifier(); + final NodeIdentifier id3 = createNodeIdentifier(); + nodeIds.add(id1); + nodeIds.add(id2); + nodeIds.add(id3); + + when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(null); + + final AsyncLoadBalanceClientRegistry registry = mock(AsyncLoadBalanceClientRegistry.class); + queue = new SocketLoadBalancedFlowFileQueue("unit-test", new NopConnectionEventListener(), mock(ProcessScheduler.class), flowFileRepo, provRepo, + contentRepo, claimManager, clusterCoordinator, registry, swapManager, 10000, eventReporter); + + queue.setFlowFilePartitioner(new RoundRobinPartitioner()); + + // Queue up data without knowing the local node id. + final Map<String, String> attributes = new HashMap<>(); + for (int i=0; i < 6; i++) { + attributes.put("i", String.valueOf(i)); + queue.put(new MockFlowFileRecord(attributes, 0)); + } + + for (int i=0; i < 3; i++) { + assertEquals(2, queue.getPartition(i).size().getObjectCount()); + } + + assertEquals(0, queue.getLocalPartition().size().getObjectCount()); + + when(clusterCoordinator.getLocalNodeIdentifier()).thenReturn(id1); + clusterTopologyEventListener.onLocalNodeIdentifierSet(id1); + + assertPartitionSizes(new int[] {2, 2, 2}); + + while (queue.getLocalPartition().size().getObjectCount() != 2) { + Thread.sleep(10L); + } + } + + private void assertPartitionSizes(final int[] expectedSizes) { + final int[] partitionSizes = new int[queue.getPartitionCount()]; + while (!Arrays.equals(expectedSizes, partitionSizes)) { + try { + Thread.sleep(10L); + } catch (InterruptedException e) { + Assert.fail("Interrupted"); + } + + for (int i = 0; i < partitionSizes.length; i++) { + partitionSizes[i] = queue.getPartition(i).size().getObjectCount(); + } + } + } + + + private static class StaticFlowFilePartitioner implements FlowFilePartitioner { + private final int partitionIndex; + + public StaticFlowFilePartitioner(final int partition) { + this.partitionIndex = partition; + } + + @Override + public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) { + return partitions[partitionIndex]; + } + + @Override + public boolean isRebalanceOnClusterResize() { + return false; + } + + @Override + public boolean isRebalanceOnFailure() { + return false; + } + } + + private static class FlowFileSizePartitioner implements FlowFilePartitioner { + @Override + public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) { + return partitions[(int) flowFile.getSize()]; + } + + @Override + public boolean isRebalanceOnClusterResize() { + return false; + } + + @Override + public boolean isRebalanceOnFailure() { + return false; + } + } + + private static class StaticSequencePartitioner implements FlowFilePartitioner { + private final int[] partitionIndices; + private final boolean requireRebalance; + private int index = 0; + + public StaticSequencePartitioner(final int[] partitions, final boolean requireRebalance) { + this.partitionIndices = partitions; + this.requireRebalance = requireRebalance; + } + + @Override + public QueuePartition getPartition(final FlowFileRecord flowFile, final QueuePartition[] partitions, final QueuePartition localPartition) { + final int partitionIndex = partitionIndices[index++]; + return partitions[partitionIndex]; + } + + @Override + public boolean isRebalanceOnClusterResize() { + return requireRebalance; + } + + @Override + public boolean isRebalanceOnFailure() { + return false; + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java new file mode 100644 index 0000000..71ad257 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/TestSwappablePriorityQueue.java @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.controller.MockFlowFileRecord; +import org.apache.nifi.controller.MockSwapManager; +import org.apache.nifi.controller.queue.DropFlowFileAction; +import org.apache.nifi.controller.queue.DropFlowFileRequest; +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.queue.QueueSize; +import org.apache.nifi.controller.queue.SwappablePriorityQueue; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.util.MockFlowFile; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestSwappablePriorityQueue { + + private MockSwapManager swapManager; + private final EventReporter eventReporter = EventReporter.NO_OP; + private final FlowFileQueue flowFileQueue = Mockito.mock(FlowFileQueue.class); + private final DropFlowFileAction dropAction = (flowFiles, requestor) -> { + return new QueueSize(flowFiles.size(), flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum()); + }; + + private SwappablePriorityQueue queue; + + @Before + public void setup() { + swapManager = new MockSwapManager(); + + when(flowFileQueue.getIdentifier()).thenReturn("unit-test"); + queue = new SwappablePriorityQueue(swapManager, 10000, eventReporter, flowFileQueue, dropAction, "local"); + } + + + @Test + public void testPrioritizer() { + final FlowFilePrioritizer prioritizer = (o1, o2) -> Long.compare(o1.getId(), o2.getId()); + queue.setPriorities(Collections.singletonList(prioritizer)); + + for (int i = 0; i < 5000; i++) { + queue.put(new MockFlowFile(i)); + } + + final Set<FlowFileRecord> expiredRecords = new HashSet<>(); + for (int i = 0; i < 5000; i++) { + final FlowFileRecord polled = queue.poll(expiredRecords, 500000L); + assertEquals(i, polled.getId()); + } + + // We can add flowfiles in reverse order (highest ID first) and we should still get the same order back when polling + for (int i = 0; i < 5000; i++) { + queue.put(new MockFlowFile(5000 - i)); + } + for (int i = 0; i < 5000; i++) { + final FlowFileRecord polled = queue.poll(expiredRecords, 500000L); + // ID's will start at 1, since the last FlowFile added will have ID of 5000 - 4999 + assertEquals(i + 1, polled.getId()); + } + + // Add FlowFiles again, then change prioritizer and ensure that the order is updated + for (int i = 0; i < 5000; i++) { + queue.put(new MockFlowFile(i)); + } + + final FlowFilePrioritizer reversePrioritizer = (o1, o2) -> Long.compare(o2.getId(), o1.getId()); + queue.setPriorities(Collections.singletonList(reversePrioritizer)); + + for (int i = 0; i < 5000; i++) { + final FlowFileRecord polled = queue.poll(expiredRecords, 500000L); + // ID's will start at 4999, since the last FlowFile added will have ID of 4999 (i < 5000, not i <= 5000). + assertEquals(5000 - i - 1, polled.getId()); + } + } + + @Test + public void testPollWithOnlyExpiredFlowFile() { + final FlowFileRecord expiredFlowFile = mock(FlowFileRecord.class); + when(expiredFlowFile.getEntryDate()).thenReturn(System.currentTimeMillis() - 5000L); + queue.put(expiredFlowFile); + + final Set<FlowFileRecord> expiredRecords = new HashSet<>(); + final FlowFileRecord polled = queue.poll(expiredRecords, 4999); + assertNull(polled); + + assertEquals(1, expiredRecords.size()); + final FlowFileRecord expired = expiredRecords.iterator().next(); + assertSame(expiredFlowFile, expired); + } + + @Test + public void testPollWithExpiredAndUnexpired() { + final SwappablePriorityQueue queue = new SwappablePriorityQueue(swapManager, 100, eventReporter, flowFileQueue, dropAction, "local"); + + final FlowFileRecord expiredFlowFile = mock(FlowFileRecord.class); + when(expiredFlowFile.getEntryDate()).thenReturn(System.currentTimeMillis() - 5000L); + queue.put(expiredFlowFile); + + final FlowFileRecord unexpiredFlowFile = mock(FlowFileRecord.class); + when(unexpiredFlowFile.getEntryDate()).thenReturn(System.currentTimeMillis() + 500000L); + queue.put(unexpiredFlowFile); + + final Set<FlowFileRecord> expiredRecords = new HashSet<>(); + final FlowFileRecord polled = queue.poll(expiredRecords, 4999); + assertSame(unexpiredFlowFile, polled); + + assertEquals(1, expiredRecords.size()); + final FlowFileRecord expired = expiredRecords.iterator().next(); + assertSame(expiredFlowFile, expired); + } + + @Test + public void testEmpty() { + assertTrue(queue.isEmpty()); + assertTrue(queue.isActiveQueueEmpty()); + + for (int i = 0; i < 9; i++) { + queue.put(new MockFlowFileRecord()); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + } + + queue.put(new MockFlowFileRecord()); + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + + final Set<FlowFileRecord> expiredRecords = new HashSet<>(); + final FlowFileRecord polled = queue.poll(expiredRecords, 500000); + assertNotNull(polled); + assertTrue(expiredRecords.isEmpty()); + + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + + // queue is still full because FlowFile has not yet been acknowledged. + queue.acknowledge(polled); + + // FlowFile has been acknowledged; queue should no longer be full. + assertFalse(queue.isEmpty()); + assertFalse(queue.isActiveQueueEmpty()); + } + + @Test + public void testSwapOutOccurs() { + for (int i = 0; i < 10000; i++) { + queue.put(new MockFlowFileRecord()); + assertEquals(0, swapManager.swapOutCalledCount); + assertEquals(i + 1, queue.size().getObjectCount()); + assertEquals(i + 1, queue.size().getByteCount()); + } + + for (int i = 0; i < 9999; i++) { + queue.put(new MockFlowFileRecord()); + assertEquals(0, swapManager.swapOutCalledCount); + assertEquals(i + 10001, queue.size().getObjectCount()); + assertEquals(i + 10001, queue.size().getByteCount()); + } + + queue.put(new MockFlowFileRecord(1000)); + assertEquals(1, swapManager.swapOutCalledCount); + assertEquals(20000, queue.size().getObjectCount()); + assertEquals(20999, queue.size().getByteCount()); + + assertEquals(10000, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); + } + + @Test + public void testLowestPrioritySwappedOutFirst() { + final List<FlowFilePrioritizer> prioritizers = new ArrayList<>(); + prioritizers.add((o1, o2) -> Long.compare(o1.getSize(), o2.getSize())); + queue.setPriorities(prioritizers); + + long maxSize = 20000; + for (int i = 1; i <= 20000; i++) { + queue.put(new MockFlowFileRecord(maxSize - i)); + } + + assertEquals(1, swapManager.swapOutCalledCount); + assertEquals(20000, queue.size().getObjectCount()); + + assertEquals(10000, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); + final List<FlowFileRecord> flowFiles = queue.poll(Integer.MAX_VALUE, new HashSet<FlowFileRecord>(), 500000); + assertEquals(10000, flowFiles.size()); + for (int i = 0; i < 10000; i++) { + assertEquals(i, flowFiles.get(i).getSize()); + } + } + + + @Test + public void testSwapIn() { + for (int i = 1; i <= 20000; i++) { + queue.put(new MockFlowFileRecord()); + } + + assertEquals(1, swapManager.swappedOut.size()); + queue.put(new MockFlowFileRecord()); + assertEquals(1, swapManager.swappedOut.size()); + + final Set<FlowFileRecord> exp = new HashSet<>(); + for (int i = 0; i < 9999; i++) { + final FlowFileRecord flowFile = queue.poll(exp, 500000); + assertNotNull(flowFile); + assertEquals(1, queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getObjectCount()); + assertEquals(1, queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getByteCount()); + + queue.acknowledge(Collections.singleton(flowFile)); + assertEquals(0, queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getObjectCount()); + assertEquals(0, queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getByteCount()); + } + + assertEquals(0, swapManager.swapInCalledCount); + assertEquals(1, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); + assertNotNull(queue.poll(exp, 500000)); + + assertEquals(0, swapManager.swapInCalledCount); + assertEquals(0, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); + + assertEquals(1, swapManager.swapOutCalledCount); + + assertNotNull(queue.poll(exp, 500000)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top. + assertEquals(1, swapManager.swapInCalledCount); + assertEquals(9999, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); + + assertTrue(swapManager.swappedOut.isEmpty()); + + queue.poll(exp, 500000); + } + + @Test + public void testSwapInWhenThresholdIsLessThanSwapSize() { + // create a queue where the swap threshold is less than 10k + queue = new SwappablePriorityQueue(swapManager, 1000, eventReporter, flowFileQueue, dropAction, null); + + for (int i = 1; i <= 20000; i++) { + queue.put(new MockFlowFileRecord()); + } + + assertEquals(1, swapManager.swappedOut.size()); + queue.put(new MockFlowFileRecord()); + assertEquals(1, swapManager.swappedOut.size()); + + final Set<FlowFileRecord> exp = new HashSet<>(); + + // At this point there should be: + // 1k flow files in the active queue + // 9,001 flow files in the swap queue + // 10k flow files swapped to disk + + for (int i = 0; i < 999; i++) { // + final FlowFileRecord flowFile = queue.poll(exp, 500000); + assertNotNull(flowFile); + assertEquals(1, queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getObjectCount()); + assertEquals(1, queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getByteCount()); + + queue.acknowledge(Collections.singleton(flowFile)); + assertEquals(0, queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getObjectCount()); + assertEquals(0, queue.getQueueDiagnostics().getUnacknowledgedQueueSize().getByteCount()); + } + + assertEquals(0, swapManager.swapInCalledCount); + assertEquals(1, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); + assertNotNull(queue.poll(exp, 500000)); + + assertEquals(0, swapManager.swapInCalledCount); + assertEquals(0, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); + + assertEquals(1, swapManager.swapOutCalledCount); + + assertNotNull(queue.poll(exp, 500000)); // this should trigger a swap-in of 10,000 records, and then pull 1 off the top. + assertEquals(1, swapManager.swapInCalledCount); + assertEquals(9999, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); + + assertTrue(swapManager.swappedOut.isEmpty()); + + queue.poll(exp, 500000); + } + + @Test + public void testQueueCountsUpdatedWhenIncompleteSwapFile() { + for (int i = 1; i <= 20000; i++) { + queue.put(new MockFlowFileRecord()); + } + + assertEquals(20000, queue.size().getObjectCount()); + assertEquals(20000, queue.size().getByteCount()); + + assertEquals(1, swapManager.swappedOut.size()); + + // when we swap in, cause an IncompleteSwapFileException to be + // thrown and contain only 9,999 of the 10,000 FlowFiles + swapManager.enableIncompleteSwapFileException(9999); + final Set<FlowFileRecord> expired = Collections.emptySet(); + FlowFileRecord flowFile; + + for (int i = 0; i < 10000; i++) { + flowFile = queue.poll(expired, 500000); + assertNotNull(flowFile); + queue.acknowledge(Collections.singleton(flowFile)); + } + + // 10,000 FlowFiles on queue - all swapped out + assertEquals(10000, queue.size().getObjectCount()); + assertEquals(10000, queue.size().getByteCount()); + assertEquals(1, swapManager.swappedOut.size()); + assertEquals(0, swapManager.swapInCalledCount); + + // Trigger swap in. This will remove 1 FlowFile from queue, leaving 9,999 but + // on swap in, we will get only 9,999 FlowFiles put onto the queue, and the queue size will + // be decremented by 10,000 (because the Swap File's header tells us that there are 10K + // FlowFiles, even though only 9999 are in the swap file) + flowFile = queue.poll(expired, 500000); + assertNotNull(flowFile); + queue.acknowledge(Collections.singleton(flowFile)); + + // size should be 9,998 because we lost 1 on Swap In, and then we pulled one above. + assertEquals(9998, queue.size().getObjectCount()); + assertEquals(9998, queue.size().getByteCount()); + assertEquals(0, swapManager.swappedOut.size()); + assertEquals(1, swapManager.swapInCalledCount); + + for (int i = 0; i < 9998; i++) { + flowFile = queue.poll(expired, 500000); + assertNotNull("Null FlowFile when i = " + i, flowFile); + queue.acknowledge(Collections.singleton(flowFile)); + + final QueueSize queueSize = queue.size(); + assertEquals(9998 - i - 1, queueSize.getObjectCount()); + assertEquals(9998 - i - 1, queueSize.getByteCount()); + } + + final QueueSize queueSize = queue.size(); + assertEquals(0, queueSize.getObjectCount()); + assertEquals(0L, queueSize.getByteCount()); + + flowFile = queue.poll(expired, 500000); + assertNull(flowFile); + } + + @Test(timeout = 120000) + public void testDropSwappedFlowFiles() { + for (int i = 1; i <= 30000; i++) { + queue.put(new MockFlowFileRecord()); + } + + assertEquals(2, swapManager.swappedOut.size()); + final DropFlowFileRequest request = new DropFlowFileRequest("Unit Test"); + + queue.dropFlowFiles(request, "Unit Test"); + + assertEquals(0, queue.size().getObjectCount()); + assertEquals(0, queue.size().getByteCount()); + assertEquals(0, swapManager.swappedOut.size()); + assertEquals(2, swapManager.swapInCalledCount); + } + + + @Test(timeout = 5000) + public void testGetActiveFlowFilesReturnsAllActiveFlowFiles() throws InterruptedException { + for (int i = 0; i < 9999; i++) { + queue.put(new MockFlowFileRecord()); + } + + final List<FlowFileRecord> active = queue.getActiveFlowFiles(); + assertNotNull(active); + assertEquals(9999, active.size()); + } + + + @Test(timeout = 5000) + public void testListFlowFilesResultsLimited() throws InterruptedException { + for (int i = 0; i < 30050; i++) { + queue.put(new MockFlowFileRecord()); + } + + final List<FlowFileRecord> activeFlowFiles = queue.getActiveFlowFiles(); + assertNotNull(activeFlowFiles); + assertEquals(10000, activeFlowFiles.size()); + } + + + @Test + public void testOOMEFollowedBySuccessfulSwapIn() { + final List<FlowFileRecord> flowFiles = new ArrayList<>(); + for (int i = 0; i < 50000; i++) { + flowFiles.add(new MockFlowFileRecord()); + } + + queue.putAll(flowFiles); + + swapManager.failSwapInAfterN = 2; + swapManager.setSwapInFailure(new OutOfMemoryError("Intentional OOME for unit test")); + + final Set<FlowFileRecord> expiredRecords = new HashSet<>(); + for (int i = 0; i < 30000; i++) { + final FlowFileRecord polled = queue.poll(expiredRecords, 500000); + assertNotNull(polled); + } + + // verify that unexpected ERROR's are handled in such a way that we keep retrying + for (int i = 0; i < 3; i++) { + try { + queue.poll(expiredRecords, 500000); + Assert.fail("Expected OOME to be thrown"); + } catch (final OutOfMemoryError oome) { + // expected + } + } + + // verify that unexpected Runtime Exceptions are handled in such a way that we keep retrying + swapManager.setSwapInFailure(new NullPointerException("Intentional OOME for unit test")); + + for (int i = 0; i < 3; i++) { + try { + queue.poll(expiredRecords, 500000); + Assert.fail("Expected NPE to be thrown"); + } catch (final NullPointerException npe) { + // expected + } + } + + swapManager.failSwapInAfterN = -1; + + for (int i = 0; i < 20000; i++) { + final FlowFileRecord polled = queue.poll(expiredRecords, 500000); + assertNotNull(polled); + } + + queue.acknowledge(flowFiles); + assertNull(queue.poll(expiredRecords, 500000)); + assertEquals(0, queue.getQueueDiagnostics().getActiveQueueSize().getObjectCount()); + assertEquals(0, queue.size().getObjectCount()); + + assertTrue(swapManager.swappedOut.isEmpty()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java new file mode 100644 index 0000000..efa5d73 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/client/async/nio/TestLoadBalanceSession.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.client.async.nio; + +import org.apache.nifi.controller.MockFlowFileRecord; +import org.apache.nifi.controller.queue.LoadBalanceCompression; +import org.apache.nifi.controller.queue.clustered.FlowFileContentAccess; +import org.apache.nifi.controller.queue.clustered.SimpleLimitThreshold; +import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec; +import org.apache.nifi.controller.queue.clustered.client.async.TransactionFailureCallback; +import org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.zip.CRC32; +import java.util.zip.CheckedOutputStream; +import java.util.zip.Checksum; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestLoadBalanceSession { + + private final TransactionFailureCallback NOP_FAILURE_CALLBACK = new TransactionFailureCallback() { + @Override + public void onTransactionFailed(final List<FlowFileRecord> flowFiles, final Exception cause, final TransactionPhase transactionPhase) { + } + + @Override + public boolean isRebalanceOnFailure() { + return false; + } + }; + + private ByteArrayOutputStream received; + private ServerSocket serverSocket; + private int port; + + @Before + public void setup() throws IOException { + received = new ByteArrayOutputStream(); + + serverSocket = new ServerSocket(0); + port = serverSocket.getLocalPort(); + + final Thread thread = new Thread(new Runnable() { + @Override + public void run() { + try (final Socket socket = serverSocket.accept()) { + final InputStream in = socket.getInputStream(); + int data; + + socket.getOutputStream().write(LoadBalanceProtocolConstants.VERSION_ACCEPTED); + socket.getOutputStream().write(LoadBalanceProtocolConstants.SPACE_AVAILABLE); + socket.getOutputStream().write(LoadBalanceProtocolConstants.CONFIRM_CHECKSUM); + socket.getOutputStream().write(LoadBalanceProtocolConstants.CONFIRM_COMPLETE_TRANSACTION); + + while ((data = in.read()) != -1) { + received.write(data); + } + } catch (IOException e) { + e.printStackTrace(); + Assert.fail(); + } + } + }); + thread.setDaemon(true); + thread.start(); + } + + @After + public void shutdown() throws IOException { + serverSocket.close(); + } + + @Test(timeout = 10000) + public void testSunnyCase() throws InterruptedException, IOException { + final Queue<FlowFileRecord> flowFiles = new LinkedList<>(); + final FlowFileRecord flowFile1 = new MockFlowFileRecord(5); + final FlowFileRecord flowFile2 = new MockFlowFileRecord(8); + flowFiles.offer(flowFile1); + flowFiles.offer(flowFile2); + + final Map<FlowFileRecord, InputStream> contentMap = new HashMap<>(); + contentMap.put(flowFile1, new ByteArrayInputStream("hello".getBytes())); + contentMap.put(flowFile2, new ByteArrayInputStream("good-bye".getBytes())); + + final FlowFileContentAccess contentAccess = contentMap::get; + + final RegisteredPartition partition = new RegisteredPartition("unit-test-connection", () -> false, + flowFiles::poll, NOP_FAILURE_CALLBACK, (ff) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true); + + final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", port)); + + socketChannel.configureBlocking(false); + final PeerChannel peerChannel = new PeerChannel(socketChannel, null, "unit-test"); + final LoadBalanceSession transaction = new LoadBalanceSession(partition, contentAccess, new StandardLoadBalanceFlowFileCodec(), peerChannel, 30000, + new SimpleLimitThreshold(100, 10_000_000)); + + Thread.sleep(100L); + + while (transaction.communicate()) { + } + + assertTrue(transaction.isComplete()); + socketChannel.close(); + + final Checksum expectedChecksum = new CRC32(); + final ByteArrayOutputStream expectedOut = new ByteArrayOutputStream(); + expectedOut.write(1); // Protocol Version + + final DataOutputStream expectedDos = new DataOutputStream(new CheckedOutputStream(expectedOut, expectedChecksum)); + expectedDos.writeUTF("unit-test-connection"); + + expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE); + expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES); + expectedDos.writeInt(68); // metadata length + expectedDos.writeInt(1); // 1 attribute + expectedDos.writeInt(4); // length of attribute + expectedDos.write("uuid".getBytes()); + expectedDos.writeInt(flowFile1.getAttribute("uuid").length()); + expectedDos.write(flowFile1.getAttribute("uuid").getBytes()); + expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date + expectedDos.writeLong(flowFile1.getEntryDate()); // entry date + expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); + expectedDos.writeShort(5); + expectedDos.write("hello".getBytes()); + expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME); + + expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES); + expectedDos.writeInt(68); // metadata length + expectedDos.writeInt(1); // 1 attribute + expectedDos.writeInt(4); // length of attribute + expectedDos.write("uuid".getBytes()); + expectedDos.writeInt(flowFile2.getAttribute("uuid").length()); + expectedDos.write(flowFile2.getAttribute("uuid").getBytes()); + expectedDos.writeLong(flowFile2.getLineageStartDate()); // lineage start date + expectedDos.writeLong(flowFile2.getEntryDate()); // entry date + expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); + expectedDos.writeShort(8); + expectedDos.write("good-bye".getBytes()); + expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME); + + expectedDos.write(LoadBalanceProtocolConstants.NO_MORE_FLOWFILES); + expectedDos.writeLong(expectedChecksum.getValue()); + expectedDos.write(LoadBalanceProtocolConstants.COMPLETE_TRANSACTION); + + final byte[] expectedSent = expectedOut.toByteArray(); + + while (received.size() < expectedSent.length) { + Thread.sleep(10L); + } + final byte[] dataSent = received.toByteArray(); + + assertArrayEquals(expectedSent, dataSent); + + assertEquals(Arrays.asList(flowFile1, flowFile2), transaction.getFlowFilesSent()); + } + + + @Test(timeout = 10000) + public void testLargeContent() throws InterruptedException, IOException { + final byte[] content = new byte[66000]; + for (int i=0; i < 66000; i++) { + content[i] = 'A'; + } + + final Queue<FlowFileRecord> flowFiles = new LinkedList<>(); + final FlowFileRecord flowFile1 = new MockFlowFileRecord(content.length); + flowFiles.offer(flowFile1); + + final Map<FlowFileRecord, InputStream> contentMap = new HashMap<>(); + contentMap.put(flowFile1, new ByteArrayInputStream(content)); + + final FlowFileContentAccess contentAccess = contentMap::get; + + final RegisteredPartition partition = new RegisteredPartition("unit-test-connection", () -> false, + flowFiles::poll, NOP_FAILURE_CALLBACK, (ff) -> {}, () -> LoadBalanceCompression.DO_NOT_COMPRESS, () -> true); + + final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", port)); + + socketChannel.configureBlocking(false); + final PeerChannel peerChannel = new PeerChannel(socketChannel, null, "unit-test"); + final LoadBalanceSession transaction = new LoadBalanceSession(partition, contentAccess, new StandardLoadBalanceFlowFileCodec(), peerChannel, 30000, + new SimpleLimitThreshold(100, 10_000_000)); + + Thread.sleep(100L); + + while (transaction.communicate()) { + } + + socketChannel.close(); + + final Checksum expectedChecksum = new CRC32(); + final ByteArrayOutputStream expectedOut = new ByteArrayOutputStream(); + expectedOut.write(1); // Protocol Version + + final DataOutputStream expectedDos = new DataOutputStream(new CheckedOutputStream(expectedOut, expectedChecksum)); + + expectedDos.writeUTF("unit-test-connection"); + + expectedDos.write(LoadBalanceProtocolConstants.CHECK_SPACE); + expectedDos.write(LoadBalanceProtocolConstants.MORE_FLOWFILES); + expectedDos.writeInt(68); // metadata length + expectedDos.writeInt(1); // 1 attribute + expectedDos.writeInt(4); // length of attribute + expectedDos.write("uuid".getBytes()); + expectedDos.writeInt(flowFile1.getAttribute("uuid").length()); + expectedDos.write(flowFile1.getAttribute("uuid").getBytes()); + expectedDos.writeLong(flowFile1.getLineageStartDate()); // lineage start date + expectedDos.writeLong(flowFile1.getEntryDate()); // entry date + + // first data frame + expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); + expectedDos.writeShort(LoadBalanceSession.MAX_DATA_FRAME_SIZE); + expectedDos.write(Arrays.copyOfRange(content, 0, LoadBalanceSession.MAX_DATA_FRAME_SIZE)); + + // second data frame + expectedDos.write(LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS); + expectedDos.writeShort(content.length - LoadBalanceSession.MAX_DATA_FRAME_SIZE); + expectedDos.write(Arrays.copyOfRange(content, LoadBalanceSession.MAX_DATA_FRAME_SIZE, content.length)); + expectedDos.write(LoadBalanceProtocolConstants.NO_DATA_FRAME); + + expectedDos.write(LoadBalanceProtocolConstants.NO_MORE_FLOWFILES); + expectedDos.writeLong(expectedChecksum.getValue()); + expectedDos.write(LoadBalanceProtocolConstants.COMPLETE_TRANSACTION); + + final byte[] expectedSent = expectedOut.toByteArray(); + + while (received.size() < expectedSent.length) { + Thread.sleep(10L); + } + final byte[] dataSent = received.toByteArray(); + + assertArrayEquals(expectedSent, dataSent); + + assertEquals(Arrays.asList(flowFile1), transaction.getFlowFilesSent()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/619f1ffe/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java new file mode 100644 index 0000000..d020c12 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java @@ -0,0 +1,656 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.queue.clustered.server; + +import org.apache.nifi.connectable.Connection; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.queue.LoadBalanceCompression; +import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; +import org.apache.nifi.controller.repository.ContentRepository; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.FlowFileRepository; +import org.apache.nifi.controller.repository.RepositoryRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; +import org.apache.nifi.provenance.ProvenanceRepository; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.zip.CRC32; +import java.util.zip.CheckedOutputStream; +import java.util.zip.Checksum; + +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_TRANSACTION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CHECK_SPACE; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.COMPLETE_TRANSACTION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_CHECKSUM; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_COMPLETE_TRANSACTION; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.MORE_FLOWFILES; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_DATA_FRAME; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_MORE_FLOWFILES; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REJECT_CHECKSUM; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SKIP_SPACE_CHECK; +import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SPACE_AVAILABLE; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyCollection; +import static org.mockito.Matchers.anyList; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +public class TestStandardLoadBalanceProtocol { + private final LoadBalanceAuthorizer ALWAYS_AUTHORIZED = nodeIds -> {}; + private FlowFileRepository flowFileRepo; + private ContentRepository contentRepo; + private ProvenanceRepository provenanceRepo; + private FlowController flowController; + private LoadBalancedFlowFileQueue flowFileQueue; + + private List<RepositoryRecord> flowFileRepoUpdateRecords; + private List<ProvenanceEventRecord> provRepoUpdateRecords; + private List<FlowFileRecord> flowFileQueuePutRecords; + private List<FlowFileRecord> flowFileQueueReceiveRecords; + + private ConcurrentMap<ContentClaim, byte[]> claimContents; + + + @Before + public void setup() throws IOException { + flowFileQueuePutRecords = new ArrayList<>(); + flowFileQueueReceiveRecords = new ArrayList<>(); + flowFileRepoUpdateRecords = new ArrayList<>(); + provRepoUpdateRecords = new ArrayList<>(); + + flowFileRepo = Mockito.mock(FlowFileRepository.class); + contentRepo = Mockito.mock(ContentRepository.class); + provenanceRepo = Mockito.mock(ProvenanceRepository.class); + flowController = Mockito.mock(FlowController.class); + claimContents = new ConcurrentHashMap<>(); + + Mockito.doAnswer(new Answer<ContentClaim>() { + @Override + public ContentClaim answer(final InvocationOnMock invocation) throws Throwable { + final ContentClaim contentClaim = Mockito.mock(ContentClaim.class); + final ResourceClaim resourceClaim = Mockito.mock(ResourceClaim.class); + when(contentClaim.getResourceClaim()).thenReturn(resourceClaim); + return contentClaim; + } + }).when(contentRepo).create(Mockito.anyBoolean()); + + Mockito.doAnswer(new Answer<OutputStream>() { + @Override + public OutputStream answer(final InvocationOnMock invocation) throws Throwable { + final ContentClaim contentClaim = invocation.getArgumentAt(0, ContentClaim.class); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream() { + @Override + public void close() throws IOException { + super.close(); + claimContents.put(contentClaim, toByteArray()); + } + }; + + return baos; + } + }).when(contentRepo).write(Mockito.any(ContentClaim.class)); + + final Connection connection = Mockito.mock(Connection.class); + when(flowController.getConnection(Mockito.anyString())).thenReturn(connection); + + flowFileQueue = Mockito.mock(LoadBalancedFlowFileQueue.class); + when(flowFileQueue.getLoadBalanceCompression()).thenReturn(LoadBalanceCompression.DO_NOT_COMPRESS); + when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); + + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(final InvocationOnMock invocation) throws Throwable { + flowFileQueuePutRecords.addAll(invocation.getArgumentAt(0, Collection.class)); + return null; + } + }).when(flowFileQueue).putAll(anyCollection()); + + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(final InvocationOnMock invocation) throws Throwable { + flowFileQueueReceiveRecords.addAll(invocation.getArgumentAt(0, Collection.class)); + return null; + } + }).when(flowFileQueue).receiveFromPeer(anyCollection()); + + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(final InvocationOnMock invocation) throws Throwable { + flowFileRepoUpdateRecords.addAll(invocation.getArgumentAt(0, Collection.class)); + return null; + } + }).when(flowFileRepo).updateRepository(anyCollection()); + + Mockito.doAnswer(new Answer<Void>() { + @Override + public Void answer(final InvocationOnMock invocation) throws Throwable { + provRepoUpdateRecords.addAll(invocation.getArgumentAt(0, Collection.class)); + return null; + } + }).when(provenanceRepo).registerEvents(anyCollection()); + } + + + @Test + public void testSimpleFlowFileTransaction() throws IOException { + final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); + + final PipedInputStream serverInput = new PipedInputStream(); + final PipedOutputStream serverContentSource = new PipedOutputStream(); + serverInput.connect(serverContentSource); + + final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); + + // Write connection ID + final Checksum checksum = new CRC32(); + final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); + final DataOutputStream dos = new DataOutputStream(checkedOutput); + dos.writeUTF("unit-test-connection-id"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("a", "A"); + attributes.put("uuid", "unit-test-id"); + attributes.put("b", "B"); + + dos.write(CHECK_SPACE); + dos.write(MORE_FLOWFILES); + writeAttributes(attributes, dos); + writeContent("hello".getBytes(), dos); + dos.write(NO_MORE_FLOWFILES); + + dos.writeLong(checksum.getValue()); + dos.write(COMPLETE_TRANSACTION); + + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + + final byte[] serverResponse = serverOutput.toByteArray(); + assertEquals(3, serverResponse.length); + assertEquals(SPACE_AVAILABLE, serverResponse[0]); + assertEquals(CONFIRM_CHECKSUM, serverResponse[1]); + assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[2]); + + assertEquals(1, claimContents.size()); + final byte[] firstFlowFileContent = claimContents.values().iterator().next(); + assertArrayEquals("hello".getBytes(), firstFlowFileContent); + + Mockito.verify(flowFileRepo, times(1)).updateRepository(anyCollection()); + Mockito.verify(provenanceRepo, times(1)).registerEvents(anyList()); + Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); + Mockito.verify(flowFileQueue, times(1)).receiveFromPeer(anyCollection()); + } + + @Test + public void testMultipleFlowFiles() throws IOException { + final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); + + final PipedInputStream serverInput = new PipedInputStream(); + final PipedOutputStream serverContentSource = new PipedOutputStream(); + serverInput.connect(serverContentSource); + + final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); + + // Write connection ID + final Checksum checksum = new CRC32(); + final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); + final DataOutputStream dos = new DataOutputStream(checkedOutput); + dos.writeUTF("unit-test-connection-id"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("a", "A"); + attributes.put("uuid", "unit-test-id"); + attributes.put("b", "B"); + + // Send 4 FlowFiles. + dos.write(CHECK_SPACE); + dos.write(MORE_FLOWFILES); + writeAttributes(attributes, dos); + writeContent("hello".getBytes(), dos); + + dos.write(MORE_FLOWFILES); + writeAttributes(Collections.singletonMap("uuid", "unit-test-id-2"), dos); + writeContent(null, dos); + + dos.write(MORE_FLOWFILES); + writeAttributes(Collections.singletonMap("uuid", "unit-test-id-3"), dos); + writeContent("greetings".getBytes(), dos); + + dos.write(MORE_FLOWFILES); + writeAttributes(Collections.singletonMap("uuid", "unit-test-id-4"), dos); + writeContent(new byte[0], dos); + + dos.write(NO_MORE_FLOWFILES); + + dos.writeLong(checksum.getValue()); + dos.write(COMPLETE_TRANSACTION); + + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + + final byte[] serverResponse = serverOutput.toByteArray(); + assertEquals(3, serverResponse.length); + assertEquals(SPACE_AVAILABLE, serverResponse[0]); + assertEquals(CONFIRM_CHECKSUM, serverResponse[1]); + assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[2]); + + assertEquals(1, claimContents.size()); + final byte[] bytes = claimContents.values().iterator().next(); + assertTrue(Arrays.equals("hellogreetings".getBytes(), bytes) || Arrays.equals("greetingshello".getBytes(), bytes)); + + assertEquals(4, flowFileRepoUpdateRecords.size()); + assertEquals(4, provRepoUpdateRecords.size()); + assertEquals(0, flowFileQueuePutRecords.size()); + assertEquals(4, flowFileQueueReceiveRecords.size()); + + assertTrue(provRepoUpdateRecords.stream().allMatch(event -> event.getEventType() == ProvenanceEventType.RECEIVE)); + } + + + @Test + public void testMultipleFlowFilesWithoutCheckingSpace() throws IOException { + final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); + + final PipedInputStream serverInput = new PipedInputStream(); + final PipedOutputStream serverContentSource = new PipedOutputStream(); + serverInput.connect(serverContentSource); + + final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); + + // Write connection ID + final Checksum checksum = new CRC32(); + final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); + final DataOutputStream dos = new DataOutputStream(checkedOutput); + dos.writeUTF("unit-test-connection-id"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("a", "A"); + attributes.put("uuid", "unit-test-id"); + attributes.put("b", "B"); + + // Send 4 FlowFiles. + dos.write(SKIP_SPACE_CHECK); + dos.write(MORE_FLOWFILES); + writeAttributes(attributes, dos); + writeContent("hello".getBytes(), dos); + + dos.write(MORE_FLOWFILES); + writeAttributes(Collections.singletonMap("uuid", "unit-test-id-2"), dos); + writeContent(null, dos); + + dos.write(MORE_FLOWFILES); + writeAttributes(Collections.singletonMap("uuid", "unit-test-id-3"), dos); + writeContent("greetings".getBytes(), dos); + + dos.write(MORE_FLOWFILES); + writeAttributes(Collections.singletonMap("uuid", "unit-test-id-4"), dos); + writeContent(new byte[0], dos); + + dos.write(NO_MORE_FLOWFILES); + + dos.writeLong(checksum.getValue()); + dos.write(COMPLETE_TRANSACTION); + + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + + final byte[] serverResponse = serverOutput.toByteArray(); + assertEquals(2, serverResponse.length); + assertEquals(CONFIRM_CHECKSUM, serverResponse[0]); + assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[1]); + + assertEquals(1, claimContents.size()); + final byte[] bytes = claimContents.values().iterator().next(); + assertTrue(Arrays.equals("hellogreetings".getBytes(), bytes) || Arrays.equals("greetingshello".getBytes(), bytes)); + + assertEquals(4, flowFileRepoUpdateRecords.size()); + assertEquals(4, provRepoUpdateRecords.size()); + assertEquals(0, flowFileQueuePutRecords.size()); + assertEquals(4, flowFileQueueReceiveRecords.size()); + + assertTrue(provRepoUpdateRecords.stream().allMatch(event -> event.getEventType() == ProvenanceEventType.RECEIVE)); + } + + @Test + public void testEofExceptionMultipleFlowFiles() throws IOException { + final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); + + final PipedInputStream serverInput = new PipedInputStream(); + final PipedOutputStream serverContentSource = new PipedOutputStream(); + serverInput.connect(serverContentSource); + + final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); + + // Write connection ID + final Checksum checksum = new CRC32(); + final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); + final DataOutputStream dos = new DataOutputStream(checkedOutput); + dos.writeUTF("unit-test-connection-id"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("a", "A"); + attributes.put("uuid", "unit-test-id"); + attributes.put("b", "B"); + + // Send 4 FlowFiles. + dos.write(CHECK_SPACE); + dos.write(MORE_FLOWFILES); + writeAttributes(attributes, dos); + writeContent("hello".getBytes(), dos); + + dos.write(MORE_FLOWFILES); + writeAttributes(Collections.singletonMap("uuid", "unit-test-id-2"), dos); + writeContent(null, dos); + + dos.write(MORE_FLOWFILES); + writeAttributes(Collections.singletonMap("uuid", "unit-test-id-3"), dos); + writeContent("greetings".getBytes(), dos); + + dos.write(MORE_FLOWFILES); + writeAttributes(Collections.singletonMap("uuid", "unit-test-id-4"), dos); + writeContent(new byte[0], dos); + + dos.flush(); + dos.close(); + + try { + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + Assert.fail("Expected EOFException but none was thrown"); + } catch (final EOFException eof) { + // expected + } + + final byte[] serverResponse = serverOutput.toByteArray(); + assertEquals(1, serverResponse.length); + assertEquals(SPACE_AVAILABLE, serverResponse[0]); + + assertEquals(1, claimContents.size()); + assertArrayEquals("hellogreetings".getBytes(), claimContents.values().iterator().next()); + + assertEquals(0, flowFileRepoUpdateRecords.size()); + assertEquals(0, provRepoUpdateRecords.size()); + assertEquals(0, flowFileQueuePutRecords.size()); + } + + @Test + public void testBadChecksum() throws IOException { + final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); + + final PipedInputStream serverInput = new PipedInputStream(); + final PipedOutputStream serverContentSource = new PipedOutputStream(); + serverInput.connect(serverContentSource); + + final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); + + // Write connection ID + final Checksum checksum = new CRC32(); + final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); + final DataOutputStream dos = new DataOutputStream(checkedOutput); + dos.writeUTF("unit-test-connection-id"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("uuid", "unit-test-id"); + + dos.write(CHECK_SPACE); + dos.write(MORE_FLOWFILES); + writeAttributes(attributes, dos); + writeContent("hello".getBytes(), dos); + dos.write(NO_MORE_FLOWFILES); + + dos.writeLong(1L); // Write bad checksum. + dos.write(COMPLETE_TRANSACTION); + + try { + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + Assert.fail("Expected TransactionAbortedException but none was thrown"); + } catch (final TransactionAbortedException e) { + // expected + } + + final byte[] serverResponse = serverOutput.toByteArray(); + assertEquals(2, serverResponse.length); + assertEquals(SPACE_AVAILABLE, serverResponse[0]); + assertEquals(REJECT_CHECKSUM, serverResponse[1]); + + assertEquals(1, claimContents.size()); + final byte[] firstFlowFileContent = claimContents.values().iterator().next(); + assertArrayEquals("hello".getBytes(), firstFlowFileContent); + + Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection()); + Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList()); + Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); + Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next()); + } + + @Test + public void testEofWritingContent() throws IOException { + final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); + + final PipedInputStream serverInput = new PipedInputStream(); + final PipedOutputStream serverContentSource = new PipedOutputStream(); + serverInput.connect(serverContentSource); + + final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); + + // Write connection ID + final Checksum checksum = new CRC32(); + final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); + final DataOutputStream dos = new DataOutputStream(checkedOutput); + dos.writeUTF("unit-test-connection-id"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("uuid", "unit-test-id"); + + dos.write(CHECK_SPACE); + dos.write(MORE_FLOWFILES); + writeAttributes(attributes, dos); + + // Indicate 45 byte data frame, then stop after 5 bytes. + dos.write(DATA_FRAME_FOLLOWS); + dos.writeShort(45); + dos.write("hello".getBytes()); + dos.flush(); + dos.close(); + + try { + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + Assert.fail("Expected EOFException but none was thrown"); + } catch (final EOFException e) { + // expected + } + + final byte[] serverResponse = serverOutput.toByteArray(); + assertEquals(1, serverResponse.length); + assertEquals(SPACE_AVAILABLE, serverResponse[0]); + + assertEquals(1, claimContents.size()); + final byte[] firstFlowFileContent = claimContents.values().iterator().next(); + assertArrayEquals(new byte[0], firstFlowFileContent); + + Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection()); + Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList()); + Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); + Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next()); + } + + @Test + public void testAbortAfterChecksumConfirmation() throws IOException { + final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); + + final PipedInputStream serverInput = new PipedInputStream(); + final PipedOutputStream serverContentSource = new PipedOutputStream(); + serverInput.connect(serverContentSource); + + final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); + + // Write connection ID + final Checksum checksum = new CRC32(); + final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); + final DataOutputStream dos = new DataOutputStream(checkedOutput); + dos.writeUTF("unit-test-connection-id"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("uuid", "unit-test-id"); + + dos.write(CHECK_SPACE); + dos.write(MORE_FLOWFILES); + writeAttributes(attributes, dos); + writeContent("hello".getBytes(), dos); + dos.write(NO_MORE_FLOWFILES); + + dos.writeLong(checksum.getValue()); + dos.write(ABORT_TRANSACTION); + + try { + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + Assert.fail("Expected TransactionAbortedException but none was thrown"); + } catch (final TransactionAbortedException e) { + // expected + } + + final byte[] serverResponse = serverOutput.toByteArray(); + assertEquals(2, serverResponse.length); + assertEquals(SPACE_AVAILABLE, serverResponse[0]); + assertEquals(CONFIRM_CHECKSUM, serverResponse[1]); + + assertEquals(1, claimContents.size()); + final byte[] firstFlowFileContent = claimContents.values().iterator().next(); + assertArrayEquals("hello".getBytes(), firstFlowFileContent); + + Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection()); + Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList()); + Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); + Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next()); + } + + @Test + public void testFlowFileNoContent() throws IOException { + final StandardLoadBalanceProtocol protocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepo, provenanceRepo, flowController, ALWAYS_AUTHORIZED); + + final PipedInputStream serverInput = new PipedInputStream(); + final PipedOutputStream serverContentSource = new PipedOutputStream(); + serverInput.connect(serverContentSource); + + final ByteArrayOutputStream serverOutput = new ByteArrayOutputStream(); + + // Write connection ID + final Checksum checksum = new CRC32(); + final OutputStream checkedOutput = new CheckedOutputStream(serverContentSource, checksum); + final DataOutputStream dos = new DataOutputStream(checkedOutput); + dos.writeUTF("unit-test-connection-id"); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("uuid", "unit-test-id"); + + dos.write(CHECK_SPACE); + dos.write(MORE_FLOWFILES); + writeAttributes(attributes, dos); + writeContent(null, dos); + dos.write(NO_MORE_FLOWFILES); + + dos.writeLong(checksum.getValue()); + dos.write(COMPLETE_TRANSACTION); + + protocol.receiveFlowFiles(serverInput, serverOutput, "Unit Test", 1, "unit.test"); + + final byte[] serverResponse = serverOutput.toByteArray(); + assertEquals(3, serverResponse.length); + assertEquals(SPACE_AVAILABLE, serverResponse[0]); + assertEquals(CONFIRM_CHECKSUM, serverResponse[1]); + assertEquals(CONFIRM_COMPLETE_TRANSACTION, serverResponse[2]); + + assertEquals(1, claimContents.size()); + assertEquals(0, claimContents.values().iterator().next().length); + + Mockito.verify(flowFileRepo, times(1)).updateRepository(anyCollection()); + Mockito.verify(provenanceRepo, times(1)).registerEvents(anyList()); + Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); + Mockito.verify(flowFileQueue, times(1)).receiveFromPeer(anyCollection()); + } + + private void writeAttributes(final Map<String, String> attributes, final DataOutputStream dos) throws IOException { + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos)) { + out.writeInt(attributes.size()); + + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + final byte[] key = entry.getKey().getBytes(); + out.writeInt(key.length); + out.write(key); + + final byte[] value = entry.getValue().getBytes(); + out.writeInt(value.length); + out.write(value); + } + + out.writeLong(0L); // lineage start date + out.writeLong(0L); // entry date + + dos.writeInt(baos.size()); + baos.writeTo(dos); + } + + } + + private void writeContent(final byte[] content, final DataOutputStream out) throws IOException { + if (content == null) { + out.write(NO_DATA_FRAME); + return; + } + + int iterations = content.length / 65535; + if (content.length % 65535 > 0) { + iterations++; + } + + for (int i=0; i < iterations; i++) { + final int offset = i * 65536; + final int length = Math.min(content.length - offset, 65535); + + out.write(DATA_FRAME_FOLLOWS); + out.writeShort(length); + out.write(content, offset, length); + } + + out.write(NO_DATA_FRAME); + } +}
