asfgit closed pull request #3238: NIFI-5919: Addressed a race condition that
can exist if adding FlowFi…
URL: https://github.com/apache/nifi/pull/3238
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
index 84731f769e..353af49f2c 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/SocketLoadBalancedFlowFileQueue.java
@@ -769,8 +769,26 @@ public void receiveFromPeer(final
Collection<FlowFileRecord> flowFiles) {
putAll(flowFiles);
} else {
logger.debug("Received the following FlowFiles from Peer: {}.
Will accept FlowFiles to the local partition", flowFiles);
- localPartition.putAll(flowFiles);
+
+ // As explained in the putAllAndGetPartitions() method, we
must ensure that we call adjustSize() before we
+ // put the FlowFiles on the queue. Otherwise, we will
encounter a race condition. Specifically, that race condition
+ // can play out like so:
+ //
+ // Thread 1: Call localPartition.putAll() when the queue is
empty (has a queue size of 0) but has not yet adjusted the size.
+ // Thread 2: Call poll() to obtain the FlowFile just received.
+ // Thread 2: Transfer the FlowFile to some Relationship
+ // Thread 2: Commit the session, which will call acknowledge
on this queue.
+ // Thread 2: The acknowledge() method attempts to decrement
the size of the queue to -1.
+ // This causes an Exception to be thrown and the
queue size to remain at 0.
+ // However, the FlowFile has already been
successfully transferred to the next Queue.
+ // Thread 1: Call adjustSize() to increment the size of the
queue to 1 FlowFile.
+ //
+ // In this scenario, we now have no FlowFiles in the queue.
However, the queue size is set to 1.
+ // We can avoid this race condition by simply ensuring that we
call adjustSize() before making the FlowFiles
+ // available on the queue. This way, we cannot possibly obtain
the FlowFiles and process/acknowledge them before the queue
+ // size has been updated to account for them and therefore we
will not attempt to assign a negative queue size.
adjustSize(flowFiles.size(),
flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum());
+ localPartition.putAll(flowFiles);
}
} finally {
partitionReadLock.unlock();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services