Koji Kawamura created NIFI-3452:
-----------------------------------

             Summary: Add Wait processor Wait Mode property
                 Key: NIFI-3452
                 URL: https://issues.apache.org/jira/browse/NIFI-3452
             Project: Apache NiFi
          Issue Type: Improvement
          Components: Extensions
    Affects Versions: 1.2.0
            Reporter: Koji Kawamura
            Assignee: Koji Kawamura


NiFi back pressure is handled per relationship and as long as a relationship 
has room to receive more flow files, source processor is scheduled to run.

However, this behavior is not ideal in some cases. For example, when there is 
very computationally expensive task and user wants to limit the number of 
FlowFiles can be processed at a given time, it's not always possible to limit 
the rate by existing RateControl nor back-pressure mechanism.

As a more practical example, in the following flow, it's expected the GetSQS is 
triggered only when the previous FlowFile has been processed completely.
Node 1 is parsing a flow file (indicated by the X in the connection between 
FetchS3Object and Parse). Both connections have a back-pressure threshold of 1, 
but because the object is already fetched, the first connection is empty and 
can thus be filled. This means that, if a new item becomes available in the 
queue, both of the following cases can happen with equal probability:

{code}
Case 1:
        ----------      -----------------      ---------
Node 1: | GetSQS | -X-> | FetchS3Object | -X-> | Parse |
        ----------      -----------------      ---------

        ----------      -----------------      ---------
Node 2: | GetSQS | ---> | FetchS3Object | ---> | Parse |
        ----------      -----------------      ---------

Case 2:
        ----------      -----------------      ---------
Node 1: | GetSQS | ---> | FetchS3Object | -X-> | Parse |
        ----------      -----------------      ---------

        ----------      -----------------      ---------
Node 2: | GetSQS | -X-> | FetchS3Object | ---> | Parse |
        ----------      -----------------      ---------

{code}

To achieve that, we could improve Wait processor as follows.

NiFi scheduler checks downstream relationship availability, when it's full, the 
processor won't be scheduled to run. In case a source processor has multiple 
outgoing relationships, and if ANY of those is full, the processor won't be 
scheduled.

(This is how processor scheduling works with back-pressure, but can
alter with @TriggerWhenAnyDestinationAvailable annotation. DistributeLoad is 
the only processor annotated with this)

We could use this mechanism to keep the source processor waiting to be 
scheduled, by following flow:

{code}
GetSQS
  -- success --> FetchS3Object --> Parse --> Notify
  -- success --> Wait
{code}

To make it work as expected, we need to improve Wait so that user can choose 
how waiting FlowFile is handled, from either:
"Route to 'wait' relationship" or "Keep in the Upstream connection".
Currently it has only option to route to 'wait'.

Use "Keep in the Upstream connection" Wait mode with the flow above,
the incoming flow file in GetSQS -> Wait connection stays there until actual 
data processing finishes and Notify sends a notification signal.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to