This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new fea17d0 NIFI-5919: Addressed a race condition that can exist if
adding FlowFiles to a FlowFileQueue before adjusting the size of the queue to
account for the FlowFiles
fea17d0 is described below
commit fea17d0ca875d2450c21b69dc0c6c241374834e5
Author: Mark Payne <[email protected]>
AuthorDate: Fri Dec 28 10:35:48 2018 -0500
NIFI-5919: Addressed a race condition that can exist if adding FlowFiles to
a FlowFileQueue before adjusting the size of the queue to account for the
FlowFiles
Signed-off-by: Pierre Villard <[email protected]>
This closes #3238.
---
.../clustered/SocketLoadBalancedFlowFileQueue.java | 20 +++++++++++++++++++-
1 file changed, 19 insertions(+), 1 deletion(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 84731f7..353af49 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -769,8 +769,26 @@ public class SocketLoadBalancedFlowFileQueue extends
AbstractFlowFileQueue imple
putAll(flowFiles);
} else {
logger.debug("Received the following FlowFiles from Peer: {}.
Will accept FlowFiles to the local partition", flowFiles);
- localPartition.putAll(flowFiles);
+
+ // As explained in the putAllAndGetPartitions() method, we
must ensure that we call adjustSize() before we
+ // put the FlowFiles on the queue. Otherwise, we will
encounter a race condition. Specifically, that race condition
+ // can play out like so:
+ //
+ // Thread 1: Call localPartition.putAll() when the queue is
empty (has a queue size of 0) but has not yet adjusted the size.
+ // Thread 2: Call poll() to obtain the FlowFile just received.
+ // Thread 2: Transfer the FlowFile to some Relationship
+ // Thread 2: Commit the session, which will call acknowledge
on this queue.
+ // Thread 2: The acknowledge() method attempts to decrement
the size of the queue to -1.
+ // This causes an Exception to be thrown and the
queue size to remain at 0.
+ // However, the FlowFile has already been
successfully transferred to the next Queue.
+ // Thread 1: Call adjustSize() to increment the size of the
queue to 1 FlowFile.
+ //
+ // In this scenario, we now have no FlowFiles in the queue.
However, the queue size is set to 1.
+ // We can avoid this race condition by simply ensuring that we
call adjustSize() before making the FlowFiles
+ // available on the queue. This way, we cannot possibly obtain
the FlowFiles and process/acknowledge them before the queue
+ // size has been updated to account for them and therefore we
will not attempt to assign a negative queue size.
adjustSize(flowFiles.size(),
flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
+ localPartition.putAll(flowFiles);
}
} finally {
partitionReadLock.unlock();