René Zeidler created NIFI-13687:
-----------------------------------

             Summary: Race condition with "Batch Output" mode allows FlowFiles 
from the same batch to get separated
                 Key: NIFI-13687
                 URL: https://issues.apache.org/jira/browse/NIFI-13687
             Project: Apache NiFi
          Issue Type: Bug
          Components: Core Framework
    Affects Versions: 2.0.0-M4, 1.27.0
            Reporter: René Zeidler
         Attachments: Batch_Output_Bug_Flow.json, batch-output-expected.log, 
batch-output-race-condition-A.log, batch-output-race-condition-B.log, 
image-2024-08-28-12-31-07-244.png, image-2024-08-28-12-31-40-357.png

h2. Summary

There is a race condition when using "Batch Output" in conjunction with "Single 
Batch Per Node". The process group allows the output valve to get opened while 
FlowFiles are still being read from the inputs. This allows some FlowFiles from 
the same batch to be transferred out of the process group early, separating 
them from the batch.

The race condition can occur when an input port is connected directly to an 
output port within the affected process group (see example below).
h2. Example

I simplified my use case to the following flow (that is attached in 
"Batch_Output_Bug_Flow.json"):

!image-2024-08-28-12-31-07-244.png!

The input is one FlowFile that represents a batch of work. In the first process 
group I split the FlowFile into its parts, but also keep the original batch 
FlowFile around because I need it later. The parts and original are transferred 
via different ports.

In the second group I process the parts. I still want to keep the original 
batch file around so I pass it through the same process group unmodified. The 
"Single Batch per Node" and "Batch Output" settings of the process group should 
keep all the FlowFiles together. This is where the bug occurs, because 
sometimes the batch file is transferred in and out before the parts are 
transferred in.

In the third process group I want to re-merge the processed parts with the 
original batch file, which is why I need to bring it along. This works fine as 
long as the batch file and parts are processed together as a batch, but 
suddenly fails if the batch file is transferred before the part files.

This is how the second process group "Process parts" looks like:

!image-2024-08-28-12-31-40-357.png!
h3. Reproduction steps
 # Import the attached flow and start all processors/processor groups except 
the frist one ("Generate batch input").
 # Run the input processor once.
 # Immediately refresh the UI.
{_}Expected{_}: 11 FlowFiles are queued in "Process parts"
{_}Race condition A{_}: 10 FlowFiles are queued in "Process parts", 1 is queued 
in "Re-merge batch"
{_}Race condition B{_}: 10 FlowFiles are queued in "Process parts", 1 is queued 
in the "Batch" connection between "Process parts" and "Re-merge batch"
 # Wait 7 seconds for the flow to complete.
 # Refresh the UI.
{_}Expected{_}: 1 FlowFile is in the final queue ("Processed batch").
{_}Race condition A{_}: 2 FlowFiles are in the final queue (none of which 
contain the expected contents).
{_}Race condition B{_}: 1 FlowFile is in the final queue (the final output is 
unaffected by race condition B in this example, but the behavior is still 
incorrect).

Because this is a race condition, it's unreliable to reproduce. If you don't 
see the race condition, clear the final queue and try again from step 2. It can 
take me 10 or so tries to see the bug. It is possible that the likelihood of 
the race condition is influenced by outside factors, such as other 
flows/processes running on the node, but I haven't investigated that.
h2. Log

I've enabled TRACE level logging for {{org.apache.nifi.groups}} and 
{{org.apache.nifi.connectable}} to diagnose the race condition. The full logs 
for the expected case, race condition A, and race condition B are attached.

The important events from the logs are summarized in the table below:
||Race condition A
batch-output-race-condition-A.log| ||Race condition B
batch-output-race-condition-B.log| ||Expected
batch-output-expected.log||
||Log line||Process group||Port||State||Log line||Process 
group||Port||State||Log line||Process group||Port||State||
|7|1|IN|OPEN| ** 
{color:#ff8b00} {color}
{color:#ff8b00} {color}|8|1|IN|OPEN| |11|1|IN|OPEN|
|10|1|IN|CLOSE|11|1|IN|CLOSE| |14|1|IN|CLOSE|
|13|1|OUT|OPEN|14|1|OUT|OPEN| |18|1|OUT|OPEN|
|19|1|OUT|CLOSE|22|1|OUT|CLOSE| |24|1|OUT|CLOSE|
|20|2|IN|OPEN|23|2|IN|OPEN| |25|2|IN|OPEN|
|*{color:#ff0000}26{color}*|*{color:#ff0000}2{color}*|*{color:#ff0000}OUT{color}*|*{color:#ff0000}OPEN{color}*|*{color:#ff0000}29{color}*|*{color:#ff0000}2{color}*|*{color:#ff0000}OUT{color}*|*{color:#ff0000}OPEN{color}*|
 | | | | |
|{color:#ff8b00}28{color}|{color:#ff8b00}2{color}|{color:#ff8b00}OUT{color}|{color:#ff8b00}CLOSE{color}|{color:#ff8b00}2
 OUT stays OPEN{color}| | | | | |
|32|2|IN|CLOSE|36|2|IN|CLOSE| |33|2|IN|CLOSE|
|{color:#ff8b00}34{color}|{color:#ff8b00}3{color}|{color:#ff8b00}IN{color}|{color:#ff8b00}OPEN{color}|
 | | | | | | | | |
|{color:#ff8b00}37{color}|{color:#ff8b00}3{color}|{color:#ff8b00}IN{color}|{color:#ff8b00}CLOSE{color}|
 | | | | | | | | |
|{color:#ff8b00}41{color}|{color:#ff8b00}3{color}|{color:#ff8b00}OUT{color}|{color:#ff8b00}OPEN{color}|
 | | | | | | | | |
|{color:#ff8b00}43{color}|{color:#ff8b00}3{color}|{color:#ff8b00}OUT{color}|{color:#ff8b00}CLOSE{color}|
 | | | | | | | | |
|47|2|OUT|OPEN|{color:#ff8b00}2 OUT was already OPEN{color}| |670|2|OUT|OPEN|
|49|2|OUT|CLOSE|1244|2|OUT|CLOSE| |676|2|OUT|CLOSE|
|50|3|IN|OPEN|1245|3|IN|OPEN| |677|3|IN|OPEN|
|53|3|IN|CLOSE|1253|3|IN|CLOSE| |685|3|IN|CLOSE|
|57|3|OUT|OPEN|1257|3|OUT|OPEN| |689|3|OUT|OPEN|
|59|3|OUT|CLOSE|1259|3|OUT|CLOSE| |691|3|OUT|CLOSE|

The bold, red row is where the bug happens. The output valve for process group 
2 is opened while its input valve is still open. This should not be allowed for 
process groups with "Batch Output" and "Single Batch per Node".
h2. Code pointers

The main check for process groups with "Batch Output" - that doesn't allow 
FlowFiles to leave until the whole batch has been processed - happens here:

[https://github.com/apache/nifi/blob/5440c6d405643b627c48870acda1298961d4b1b2/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/LocalPort.java#L199-L220]

Additional checking happens here:

[https://github.com/apache/nifi/blob/5440c6d405643b627c48870acda1298961d4b1b2/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java#L190-L223]

This is also where it's possible to check whether the input valve is still open 
({{{}groupsWithDataFlowingIn.contains(sourceGroup.getIdentifier()){}}}).

A check should be added somewhere for process groups that are in "Batch Output" 
mode that doesn't allow the output valve to be opened while the input valve is 
still open.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to