René Zeidler created NIFI-13687:
-----------------------------------
Summary: Race condition with "Batch Output" mode allows FlowFiles
from the same batch to get separated
Key: NIFI-13687
URL: https://issues.apache.org/jira/browse/NIFI-13687
Project: Apache NiFi
Issue Type: Bug
Components: Core Framework
Affects Versions: 2.0.0-M4, 1.27.0
Reporter: René Zeidler
Attachments: Batch_Output_Bug_Flow.json, batch-output-expected.log,
batch-output-race-condition-A.log, batch-output-race-condition-B.log,
image-2024-08-28-12-31-07-244.png, image-2024-08-28-12-31-40-357.png
h2. Summary
There is a race condition when using "Batch Output" in conjunction with "Single
Batch Per Node". The process group allows the output valve to get opened while
FlowFiles are still being read from the inputs. This allows some FlowFiles from
the same batch to be transferred out of the process group early, separating
them from the batch.
The race condition can occur when an input port is connected directly to an
output port within the affected process group (see example below).
h2. Example
I simplified my use case to the following flow (that is attached in
"Batch_Output_Bug_Flow.json"):
!image-2024-08-28-12-31-07-244.png!
The input is one FlowFile that represents a batch of work. In the first process
group I split the FlowFile into its parts, but also keep the original batch
FlowFile around because I need it later. The parts and original are transferred
via different ports.
In the second group I process the parts. I still want to keep the original
batch file around so I pass it through the same process group unmodified. The
"Single Batch per Node" and "Batch Output" settings of the process group should
keep all the FlowFiles together. This is where the bug occurs, because
sometimes the batch file is transferred in and out before the parts are
transferred in.
In the third process group I want to re-merge the processed parts with the
original batch file, which is why I need to bring it along. This works fine as
long as the batch file and parts are processed together as a batch, but
suddenly fails if the batch file is transferred before the part files.
This is how the second process group "Process parts" looks like:
!image-2024-08-28-12-31-40-357.png!
h3. Reproduction steps
# Import the attached flow and start all processors/processor groups except
the frist one ("Generate batch input").
# Run the input processor once.
# Immediately refresh the UI.
{_}Expected{_}: 11 FlowFiles are queued in "Process parts"
{_}Race condition A{_}: 10 FlowFiles are queued in "Process parts", 1 is queued
in "Re-merge batch"
{_}Race condition B{_}: 10 FlowFiles are queued in "Process parts", 1 is queued
in the "Batch" connection between "Process parts" and "Re-merge batch"
# Wait 7 seconds for the flow to complete.
# Refresh the UI.
{_}Expected{_}: 1 FlowFile is in the final queue ("Processed batch").
{_}Race condition A{_}: 2 FlowFiles are in the final queue (none of which
contain the expected contents).
{_}Race condition B{_}: 1 FlowFile is in the final queue (the final output is
unaffected by race condition B in this example, but the behavior is still
incorrect).
Because this is a race condition, it's unreliable to reproduce. If you don't
see the race condition, clear the final queue and try again from step 2. It can
take me 10 or so tries to see the bug. It is possible that the likelihood of
the race condition is influenced by outside factors, such as other
flows/processes running on the node, but I haven't investigated that.
h2. Log
I've enabled TRACE level logging for {{org.apache.nifi.groups}} and
{{org.apache.nifi.connectable}} to diagnose the race condition. The full logs
for the expected case, race condition A, and race condition B are attached.
The important events from the logs are summarized in the table below:
||Race condition A
batch-output-race-condition-A.log| ||Race condition B
batch-output-race-condition-B.log| ||Expected
batch-output-expected.log||
||Log line||Process group||Port||State||Log line||Process
group||Port||State||Log line||Process group||Port||State||
|7|1|IN|OPEN| **
{color:#ff8b00} {color}
{color:#ff8b00} {color}|8|1|IN|OPEN| |11|1|IN|OPEN|
|10|1|IN|CLOSE|11|1|IN|CLOSE| |14|1|IN|CLOSE|
|13|1|OUT|OPEN|14|1|OUT|OPEN| |18|1|OUT|OPEN|
|19|1|OUT|CLOSE|22|1|OUT|CLOSE| |24|1|OUT|CLOSE|
|20|2|IN|OPEN|23|2|IN|OPEN| |25|2|IN|OPEN|
|*{color:#ff0000}26{color}*|*{color:#ff0000}2{color}*|*{color:#ff0000}OUT{color}*|*{color:#ff0000}OPEN{color}*|*{color:#ff0000}29{color}*|*{color:#ff0000}2{color}*|*{color:#ff0000}OUT{color}*|*{color:#ff0000}OPEN{color}*|
| | | | |
|{color:#ff8b00}28{color}|{color:#ff8b00}2{color}|{color:#ff8b00}OUT{color}|{color:#ff8b00}CLOSE{color}|{color:#ff8b00}2
OUT stays OPEN{color}| | | | | |
|32|2|IN|CLOSE|36|2|IN|CLOSE| |33|2|IN|CLOSE|
|{color:#ff8b00}34{color}|{color:#ff8b00}3{color}|{color:#ff8b00}IN{color}|{color:#ff8b00}OPEN{color}|
| | | | | | | | |
|{color:#ff8b00}37{color}|{color:#ff8b00}3{color}|{color:#ff8b00}IN{color}|{color:#ff8b00}CLOSE{color}|
| | | | | | | | |
|{color:#ff8b00}41{color}|{color:#ff8b00}3{color}|{color:#ff8b00}OUT{color}|{color:#ff8b00}OPEN{color}|
| | | | | | | | |
|{color:#ff8b00}43{color}|{color:#ff8b00}3{color}|{color:#ff8b00}OUT{color}|{color:#ff8b00}CLOSE{color}|
| | | | | | | | |
|47|2|OUT|OPEN|{color:#ff8b00}2 OUT was already OPEN{color}| |670|2|OUT|OPEN|
|49|2|OUT|CLOSE|1244|2|OUT|CLOSE| |676|2|OUT|CLOSE|
|50|3|IN|OPEN|1245|3|IN|OPEN| |677|3|IN|OPEN|
|53|3|IN|CLOSE|1253|3|IN|CLOSE| |685|3|IN|CLOSE|
|57|3|OUT|OPEN|1257|3|OUT|OPEN| |689|3|OUT|OPEN|
|59|3|OUT|CLOSE|1259|3|OUT|CLOSE| |691|3|OUT|CLOSE|
The bold, red row is where the bug happens. The output valve for process group
2 is opened while its input valve is still open. This should not be allowed for
process groups with "Batch Output" and "Single Batch per Node".
h2. Code pointers
The main check for process groups with "Batch Output" - that doesn't allow
FlowFiles to leave until the whole batch has been processed - happens here:
[https://github.com/apache/nifi/blob/5440c6d405643b627c48870acda1298961d4b1b2/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/LocalPort.java#L199-L220]
Additional checking happens here:
[https://github.com/apache/nifi/blob/5440c6d405643b627c48870acda1298961d4b1b2/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java#L190-L223]
This is also where it's possible to check whether the input valve is still open
({{{}groupsWithDataFlowingIn.contains(sourceGroup.getIdentifier()){}}}).
A check should be added somewhere for process groups that are in "Batch Output"
mode that doesn't allow the output valve to be opened while the input valve is
still open.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)