Mark Payne created NIFI-8469:
--------------------------------

             Summary: Change ProcessSession so that commits are asynchronous
                 Key: NIFI-8469
                 URL: https://issues.apache.org/jira/browse/NIFI-8469
             Project: Apache NiFi
          Issue Type: New Feature
          Components: Core Framework, Extensions
            Reporter: Mark Payne


Currently, ProcessSession.commit() guarantees that when the method call 
returns, that all FlowFiles have been persisted to the repositories. As such, 
it is safe to acknowledge/dispose of the data on the external system.

For example, a processor that consumes from JMS will consume the message, 
create a FlowFile from it, and then call ProcessSession.commit(). Only then is 
it safe to acknowledge the message on the JMS broker. If it is acknowledged 
prior to calling ProcessSession.commit(), and NiFi is restarted, the data may 
be lost. But after calling ProcessSession.commit(), it is safe because the 
content will be available to NiFi upon restart.

This API has served us well. However, lately there has also been a great deal 
of work and desire from the community to introduce other "runtimes." One of 
those is MiNiFi. Another is Stateless NiFi.

One of the distinguishing features of Stateless NiFi is that it stores content 
in-memory. This means that fast (and large) disks are not necessary, but it 
also means that upon restarting the application, all FlowFiles are lost. In 
order to provide data reliability, Stateless NiFi requires that the source of 
data be both reliable and replayable (like a JMS Broker or Apache Kafka, for 
instance). In this way, we can keep content in-memory by avoiding the message 
acknowledgment until after we've finished processing the message completely. If 
the application is restarted in the middle, the message will not have been 
acknowledged and as a result will be replayed, so we maintain our strong 
at-least-once guarantees.

Another distinguishing feature of Stateless NiFi is that it is single-threaded. 
This allows us to be far more scalable and consume few resources.

This works by allowing ProcessSession.commit() to enqueue data for the next 
Processor  in the chain and then invoke the next Processor (recursively) before 
ProcessSession.commit() ever returns.

Unfortunately, though, some dataflows do not work well with such a model. Any 
flow that has MergeContent or MergeRecord in the middle will end up in a 
situation where the Processor never progresses. Take for example, the following 
dataflow:

GetFile --> SplitText --> ReplaceText --> MergeContent --> PutS3Object

In this case, assume that SplitText splits an incoming FlowFile into 10 smaller 
FlowFiles. ReplaceText performs some manipulation. MergeContent is then 
expected to merge all 10 FlowFiles back into one.

However, because of the nature of how this works, after SplitText, the queue 
will have 10 FlowFiles. ReplaceText will then be called, which will consume one 
FlowFIle, manipulate it, and call ProcessSession.commit(). This will then 
enqueue the FlowFile for MergeContent. MergeContent will be triggered but will 
be unable to make progress because it doesn't have enough FlowFiles. The only 
choice that the framework has is to then call MergeContent again until its 
entire queue is emptied, but the queue will never empty. As a result, the 
dataflow will end up in an infinite loop, calling MergeContent, which will make 
no progress.

What we really want to do is to call ReplaceText repeated until its queue is 
empty and only then move on to the next Processor (MergeContent). 
Unfortunately, this can't really be accomplished with the current semantics, 
though. If we tried to do so, when ReplaceText is triggered the first time, and 
it calls ProcessSession.commit(), we would have two choices:
 * Recursively call ReplaceText.onTrigger(). This very quickly results in a 
StackOverflowException, so this approach doesn't work well.
 * Have ProcessSession.commit() block while another thread is responsible for 
calling ReplaceText.onTrigger(). This results in spawning a new thread for each 
FlowFile in the queue, which can very quickly exhaust the number of threads, 
leading to an OutOfMemoryError (or, even worse, depending on system/jvm 
settings, causing the entire operating system to crash).

So any approach here is not viable.

Additionally, any dataflow that has a self-loop such as a failure loop has the 
same issue as above, resulting in a StackOverflowException.

The idea here, then, is to deprecate ProcessSession.commit() in favor of a new 
ProcessSession.commitAsync(). (Perhaps there will be a better name, but we'll 
refer to it as such for the time being). The differentiator here is that 
commitAsync() would allow for an optional callback method to be invoked after 
the session commit completes:
 * void commitAsync();
 * void commitAsync(Runnable successCallback);
 * void commitAsync(Consumer<Throwable> failureCallback);
 * void commitAsync(Runnable successCallback, Consumer<Throwable> 
failureCallback);

Now, for most Processors, there is no need to call ProcessSession.commit() 
because the abstract parent takes care of it. For those that do call 
ProcessSession.commit(), it is typically because they need to perform some 
cleanup action after the commit, such as in the JMS case illustrated above. In 
this case, the logic would be updated in order to perform the cleanup in the 
callback.

So, instead of using logic such as:
{code:java}
JMSMessage message = jmsConsumer.consume();
FlowFile flowFile = createFlowFile(message, session);
session.commit();
message.acknowledge();{code}
The logic should be something more akin to:
{code:java}
JMSMessage message = jmsConsumer.consume();
FlowFile flowFile = createFlowFile(message, session);
session.commitAsync(message:acknowledge); {code}
In the case of the traditional NiFi engine, void c{{ommitAsync(Runnable 
successCallback)}} would be implemented something like:
{code:java}
void  commitAsync(Runnable successCallback) {
    commit();
    successCallback.run();
}{code}
However, in Stateless (or any other engine that may be developed), it would be 
able to instead simply create a Stack of callbacks, and add the success 
callback to the stack when session.commit() is called.

When the entire dataflow has completed, only then it will be able to unwind the 
stack of callbacks.

With this approach, it means that in the dataflow described above, ReplaceText 
can be called continually until its queue is emptied. Then, the next Processor 
can be called continually until its queue is emptied. Even if the data has been 
merged, if the process dies or is restarted, the Success Callback of GetFile 
has not be triggered so the data is still available.

However, as soon as PutS3Object calls {{ProcessSession.commit()}} (because it 
is the last processor in the chain and it auto-terminates the FlowFile) the 
stack of callbacks can be unwound and called. As a result, GetFile's Success 
Callback is triggered only after successful completion of the entire dataflow. 
If any processor along the way rolls back the session or throws an uncaught 
Exception, the entire session is rolled back, ensuring no data loss.

There are a few other important considerations to take note of:
 * Because the StandardProcessSession.commitAsync() would behave as described 
above, just calling commit() and then any provided callbacks, the change in how 
traditional NiFi operates is quite minimal and therefore fairly low risk. The 
changes to Stateless NiFi are higher risk but Stateless is generally still 
considered somewhat experimental and expected to evolve pretty heavily.
 * This removes a very large barrier to entry for Stateless NiFi, which is that 
it's very difficult for users to know which flows can and cannot be used in 
Stateless. With these changes, Stateless should be able to run almost any flow 
that traditional NiFi can.
 * This will mean that processors should change to a new API, but for probably 
> 95% of processors, the changes necessary will be trivial.
 * The Mock Framework should be updated to cause a failure on a call to 
ProcessSession.commit() in order to ensure that the new commitAsync() is being 
used. The TestRunner should enable this by default but allow the requirement 
for commitAsync() to be disabled by calling something like 
{{TestRunner.allowSynchronousCommits()}}. Of course, this would only occur when 
users chose to change to 1.14.0 (or whatever version this is released in) of 
the Mock Framework. Building against older versions of the Mock Framework would 
not expose this behavior.
 * If any Processor does still use {{ProcessSession.commit()}} and is run in 
Stateless NiFi, the Stateless engine will be required to then trigger the next 
Processor in the flow before returning from {{ProcessSession.commit()}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to