This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new fea17d0  NIFI-5919: Addressed a race condition that can exist if 
adding FlowFiles to a FlowFileQueue before adjusting the size of the queue to 
account for the FlowFiles
fea17d0 is described below

commit fea17d0ca875d2450c21b69dc0c6c241374834e5
Author: Mark Payne <[email protected]>
AuthorDate: Fri Dec 28 10:35:48 2018 -0500

    NIFI-5919: Addressed a race condition that can exist if adding FlowFiles to 
a FlowFileQueue before adjusting the size of the queue to account for the 
FlowFiles
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #3238.
---
 .../clustered/SocketLoadBalancedFlowFileQueue.java   | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 84731f7..353af49 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -769,8 +769,26 @@ public class SocketLoadBalancedFlowFileQueue extends 
AbstractFlowFileQueue imple
                 putAll(flowFiles);
             } else {
                 logger.debug("Received the following FlowFiles from Peer: {}. 
Will accept FlowFiles to the local partition", flowFiles);
-                localPartition.putAll(flowFiles);
+
+                // As explained in the putAllAndGetPartitions() method, we 
must ensure that we call adjustSize() before we
+                // put the FlowFiles on the queue. Otherwise, we will 
encounter a race condition. Specifically, that race condition
+                // can play out like so:
+                //
+                // Thread 1: Call localPartition.putAll() when the queue is 
empty (has a queue size of 0) but has not yet adjusted the size.
+                // Thread 2: Call poll() to obtain the FlowFile just received.
+                // Thread 2: Transfer the FlowFile to some Relationship
+                // Thread 2: Commit the session, which will call acknowledge 
on this queue.
+                // Thread 2: The acknowledge() method attempts to decrement 
the size of the queue to -1.
+                //           This causes an Exception to be thrown and the 
queue size to remain at 0.
+                //           However, the FlowFile has already been 
successfully transferred to the next Queue.
+                // Thread 1: Call adjustSize() to increment the size of the 
queue to 1 FlowFile.
+                //
+                // In this scenario, we now have no FlowFiles in the queue. 
However, the queue size is set to 1.
+                // We can avoid this race condition by simply ensuring that we 
call adjustSize() before making the FlowFiles
+                // available on the queue. This way, we cannot possibly obtain 
the FlowFiles and process/acknowledge them before the queue
+                // size has been updated to account for them and therefore we 
will not attempt to assign a negative queue size.
                 adjustSize(flowFiles.size(), 
flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
+                localPartition.putAll(flowFiles);
             }
         } finally {
             partitionReadLock.unlock();

Reply via email to