Repository: nifi Updated Branches: refs/heads/master 1f2cf4bc6 -> 4c10b47e6
NIFI-5771: Ensure that we only increment claimant count for content claim if we have a FlowFile that references it This closes #3118. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4c10b47e Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4c10b47e Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4c10b47e Branch: refs/heads/master Commit: 4c10b47e602741adc52ad693a9bc56b9964cd7ef Parents: 1f2cf4b Author: Mark Payne <[email protected]> Authored: Wed Oct 31 13:11:58 2018 -0400 Committer: Bryan Bende <[email protected]> Committed: Wed Oct 31 15:25:24 2018 -0400 ---------------------------------------------------------------------- .../server/StandardLoadBalanceProtocol.java | 26 +++++++++++++------- .../queue/clustered/LoadBalancedQueueIT.java | 4 +-- .../server/TestStandardLoadBalanceProtocol.java | 12 ++++++--- 3 files changed, 28 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/4c10b47e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java index f508d12..dc780db 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/StandardLoadBalanceProtocol.java @@ -285,17 +285,14 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { if (contentClaim == null) { contentClaim = contentRepository.create(false); contentClaimOut = contentRepository.write(contentClaim); - } else { - contentRepository.incrementClaimaintCount(contentClaim); } - final RemoteFlowFileRecord flowFile; - try { - flowFile = receiveFlowFile(dataIn, contentClaimOut, contentClaim, claimOffset, protocolVersion, peerDescription, compression); - } catch (final Exception e) { - contentRepository.decrementClaimantCount(contentClaim); - throw e; - } + final RemoteFlowFileRecord flowFile = receiveFlowFile(dataIn, contentClaimOut, contentClaim, claimOffset, protocolVersion, peerDescription, compression); + + // The FlowFile's Content Claim will either be null or equal to the provided Content Claim. + // Incrementing the FlowFile's content claim will increment the count for the provided Content Claim, if it was + // assigned to the FlowFIle, or call incrementClaimantCount with an argument of null, which will do nothing. + contentRepository.incrementClaimaintCount(flowFile.getFlowFile().getContentClaim()); flowFilesReceived.add(flowFile); @@ -307,8 +304,17 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { } } + // When the Content Claim is created initially, it has a Claimaint Count of 1. We then increment the Claimant Count for each FlowFile that we add to the Content Claim, + // which means that the claimant count is currently 1 larger than it needs to be. So we will decrement the claimant count now. If that results in a count of 0, then + // we can go ahead and remove the Content Claim, since we know it's not being referenced. + final int count = contentRepository.decrementClaimantCount(contentClaim); + verifyChecksum(checksum, in, out, peerDescription, flowFilesReceived.size()); completeTransaction(in, out, peerDescription, flowFilesReceived, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) flowFileQueue); + + if (count == 0) { + contentRepository.remove(contentClaim); + } } catch (final Exception e) { // If any Exception occurs, we need to decrement the claimant counts for the Content Claims that we wrote to because // they are no longer needed. @@ -316,6 +322,8 @@ public class StandardLoadBalanceProtocol implements LoadBalanceProtocol { contentRepository.decrementClaimantCount(remoteFlowFile.getFlowFile().getContentClaim()); } + contentRepository.remove(contentClaim); + throw e; } http://git-wip-us.apache.org/repos/asf/nifi/blob/4c10b47e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java index 4871d72..e947b1c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/LoadBalancedQueueIT.java @@ -1211,7 +1211,7 @@ public class LoadBalancedQueueIT { localNodeId = new NodeIdentifier("unit-test-local", "localhost", 7090, "localhost", 7090, "localhost", 7090, null, null, null, false, null); nodeIdentifiers.add(localNodeId); - when(serverQueue.isFull()).thenReturn(true); + when(serverQueue.isLocalPartitionFull()).thenReturn(true); // Create the server final int timeoutMillis = 30000; @@ -1266,7 +1266,7 @@ public class LoadBalancedQueueIT { assertEquals(2, flowFileQueue.size().getObjectCount()); // Enable data to be transferred - when(serverQueue.isFull()).thenReturn(false); + when(serverQueue.isLocalPartitionFull()).thenReturn(false); while (clientRepoRecords.size() != 1) { Thread.sleep(10L); http://git-wip-us.apache.org/repos/asf/nifi/blob/4c10b47e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java index 94f992f..c801f8c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/queue/clustered/server/TestStandardLoadBalanceProtocol.java @@ -458,7 +458,9 @@ public class TestStandardLoadBalanceProtocol { Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection()); Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList()); Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); - Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next()); + Mockito.verify(contentRepo, times(1)).incrementClaimaintCount(claimContents.keySet().iterator().next()); + Mockito.verify(contentRepo, times(2)).decrementClaimantCount(claimContents.keySet().iterator().next()); + Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next()); } @Test @@ -509,7 +511,9 @@ public class TestStandardLoadBalanceProtocol { Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection()); Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList()); Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); - Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next()); + Mockito.verify(contentRepo, times(0)).incrementClaimaintCount(claimContents.keySet().iterator().next()); + Mockito.verify(contentRepo, times(0)).decrementClaimantCount(claimContents.keySet().iterator().next()); + Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next()); } @Test @@ -559,7 +563,9 @@ public class TestStandardLoadBalanceProtocol { Mockito.verify(flowFileRepo, times(0)).updateRepository(anyCollection()); Mockito.verify(provenanceRepo, times(0)).registerEvents(anyList()); Mockito.verify(flowFileQueue, times(0)).putAll(anyCollection()); - Mockito.verify(contentRepo, times(1)).decrementClaimantCount(claimContents.keySet().iterator().next()); + Mockito.verify(contentRepo, times(1)).incrementClaimaintCount(claimContents.keySet().iterator().next()); + Mockito.verify(contentRepo, times(2)).decrementClaimantCount(claimContents.keySet().iterator().next()); + Mockito.verify(contentRepo, times(1)).remove(claimContents.keySet().iterator().next()); } @Test
