Neil Kolban created NIFI-7045:
---------------------------------

             Summary: EnforceOrder issue with Wait Timeout / Inactive Timeout
                 Key: NIFI-7045
                 URL: https://issues.apache.org/jira/browse/NIFI-7045
             Project: Apache NiFi
          Issue Type: Bug
          Components: Core Framework
    Affects Versions: 1.10.0
            Reporter: Neil Kolban


Consider the EnforceOrder processor.  Now imagine it is given 3 incoming 
FlowFiles with a group identifier of "XYZ" and Order Attribute with values 0, 1 
and 2.  These FlowFiles are passed through an all is well.

Now think about the Wait Timeout and Inactive Timeout.  Let us now imagine that 
Inactive Timeout is set to 30 seconds.  What this means is that 30 seconds 
after the last FlowFile in a group has been seen, EnforceOrder will forget that 
it has seen such FlowFiles before.  Again, this is exactly what we want.  
However ... that is not what happens.

If we send in 3 incoming FlowFiles with group identifier of "XYZ" and Order 
Attribute of 0, 1 and 2 and then send in nothing further (for say a minute) and 
now we send in a further message with group identifier of "XYZ" and Order 
Attribute of 0 ... what we expect to have happen is that this new message will 
start a new sequence.  Unfortunately what actually happens is that the message 
is routed to the "skipped" output.  It is as though the Inactive Timeout never 
happened.  

Looking at the source code here ...

[https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/EnforceOrder.java#L305]

I believe I see why.  It appears that we check and process the Inactive Timeout 
at the END of every new batch of messages received by EnforceOrder.  What that 
means is that we have the situation.

Group Id XYZ, Seq 0
Group Id XYZ, Seq 1
Group Id XYZ, Seq 2
... wait long past Inactive Timeout ...
Group Id XYZ, Seq 0  ... and at the END of processing this FlowFile ... THEN we 
flush the state because Inactive Timeout has been reached.

A wild guess at a solution would be to move the processing of timeouts to the 
start of the FlowFile processing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to