Mark Payne created NIFI-8469:
--------------------------------
Summary: Change ProcessSession so that commits are asynchronous
Key: NIFI-8469
URL: https://issues.apache.org/jira/browse/NIFI-8469
Project: Apache NiFi
Issue Type: New Feature
Components: Core Framework, Extensions
Reporter: Mark Payne
Currently, ProcessSession.commit() guarantees that when the method call
returns, that all FlowFiles have been persisted to the repositories. As such,
it is safe to acknowledge/dispose of the data on the external system.
For example, a processor that consumes from JMS will consume the message,
create a FlowFile from it, and then call ProcessSession.commit(). Only then is
it safe to acknowledge the message on the JMS broker. If it is acknowledged
prior to calling ProcessSession.commit(), and NiFi is restarted, the data may
be lost. But after calling ProcessSession.commit(), it is safe because the
content will be available to NiFi upon restart.
This API has served us well. However, lately there has also been a great deal
of work and desire from the community to introduce other "runtimes." One of
those is MiNiFi. Another is Stateless NiFi.
One of the distinguishing features of Stateless NiFi is that it stores content
in-memory. This means that fast (and large) disks are not necessary, but it
also means that upon restarting the application, all FlowFiles are lost. In
order to provide data reliability, Stateless NiFi requires that the source of
data be both reliable and replayable (like a JMS Broker or Apache Kafka, for
instance). In this way, we can keep content in-memory by avoiding the message
acknowledgment until after we've finished processing the message completely. If
the application is restarted in the middle, the message will not have been
acknowledged and as a result will be replayed, so we maintain our strong
at-least-once guarantees.
Another distinguishing feature of Stateless NiFi is that it is single-threaded.
This allows us to be far more scalable and consume few resources.
This works by allowing ProcessSession.commit() to enqueue data for the next
Processor in the chain and then invoke the next Processor (recursively) before
ProcessSession.commit() ever returns.
Unfortunately, though, some dataflows do not work well with such a model. Any
flow that has MergeContent or MergeRecord in the middle will end up in a
situation where the Processor never progresses. Take for example, the following
dataflow:
GetFile --> SplitText --> ReplaceText --> MergeContent --> PutS3Object
In this case, assume that SplitText splits an incoming FlowFile into 10 smaller
FlowFiles. ReplaceText performs some manipulation. MergeContent is then
expected to merge all 10 FlowFiles back into one.
However, because of the nature of how this works, after SplitText, the queue
will have 10 FlowFiles. ReplaceText will then be called, which will consume one
FlowFIle, manipulate it, and call ProcessSession.commit(). This will then
enqueue the FlowFile for MergeContent. MergeContent will be triggered but will
be unable to make progress because it doesn't have enough FlowFiles. The only
choice that the framework has is to then call MergeContent again until its
entire queue is emptied, but the queue will never empty. As a result, the
dataflow will end up in an infinite loop, calling MergeContent, which will make
no progress.
What we really want to do is to call ReplaceText repeated until its queue is
empty and only then move on to the next Processor (MergeContent).
Unfortunately, this can't really be accomplished with the current semantics,
though. If we tried to do so, when ReplaceText is triggered the first time, and
it calls ProcessSession.commit(), we would have two choices:
* Recursively call ReplaceText.onTrigger(). This very quickly results in a
StackOverflowException, so this approach doesn't work well.
* Have ProcessSession.commit() block while another thread is responsible for
calling ReplaceText.onTrigger(). This results in spawning a new thread for each
FlowFile in the queue, which can very quickly exhaust the number of threads,
leading to an OutOfMemoryError (or, even worse, depending on system/jvm
settings, causing the entire operating system to crash).
So any approach here is not viable.
Additionally, any dataflow that has a self-loop such as a failure loop has the
same issue as above, resulting in a StackOverflowException.
The idea here, then, is to deprecate ProcessSession.commit() in favor of a new
ProcessSession.commitAsync(). (Perhaps there will be a better name, but we'll
refer to it as such for the time being). The differentiator here is that
commitAsync() would allow for an optional callback method to be invoked after
the session commit completes:
* void commitAsync();
* void commitAsync(Runnable successCallback);
* void commitAsync(Consumer<Throwable> failureCallback);
* void commitAsync(Runnable successCallback, Consumer<Throwable>
failureCallback);
Now, for most Processors, there is no need to call ProcessSession.commit()
because the abstract parent takes care of it. For those that do call
ProcessSession.commit(), it is typically because they need to perform some
cleanup action after the commit, such as in the JMS case illustrated above. In
this case, the logic would be updated in order to perform the cleanup in the
callback.
So, instead of using logic such as:
{code:java}
JMSMessage message = jmsConsumer.consume();
FlowFile flowFile = createFlowFile(message, session);
session.commit();
message.acknowledge();{code}
The logic should be something more akin to:
{code:java}
JMSMessage message = jmsConsumer.consume();
FlowFile flowFile = createFlowFile(message, session);
session.commitAsync(message:acknowledge); {code}
In the case of the traditional NiFi engine, void c{{ommitAsync(Runnable
successCallback)}} would be implemented something like:
{code:java}
void commitAsync(Runnable successCallback) {
commit();
successCallback.run();
}{code}
However, in Stateless (or any other engine that may be developed), it would be
able to instead simply create a Stack of callbacks, and add the success
callback to the stack when session.commit() is called.
When the entire dataflow has completed, only then it will be able to unwind the
stack of callbacks.
With this approach, it means that in the dataflow described above, ReplaceText
can be called continually until its queue is emptied. Then, the next Processor
can be called continually until its queue is emptied. Even if the data has been
merged, if the process dies or is restarted, the Success Callback of GetFile
has not be triggered so the data is still available.
However, as soon as PutS3Object calls {{ProcessSession.commit()}} (because it
is the last processor in the chain and it auto-terminates the FlowFile) the
stack of callbacks can be unwound and called. As a result, GetFile's Success
Callback is triggered only after successful completion of the entire dataflow.
If any processor along the way rolls back the session or throws an uncaught
Exception, the entire session is rolled back, ensuring no data loss.
There are a few other important considerations to take note of:
* Because the StandardProcessSession.commitAsync() would behave as described
above, just calling commit() and then any provided callbacks, the change in how
traditional NiFi operates is quite minimal and therefore fairly low risk. The
changes to Stateless NiFi are higher risk but Stateless is generally still
considered somewhat experimental and expected to evolve pretty heavily.
* This removes a very large barrier to entry for Stateless NiFi, which is that
it's very difficult for users to know which flows can and cannot be used in
Stateless. With these changes, Stateless should be able to run almost any flow
that traditional NiFi can.
* This will mean that processors should change to a new API, but for probably
> 95% of processors, the changes necessary will be trivial.
* The Mock Framework should be updated to cause a failure on a call to
ProcessSession.commit() in order to ensure that the new commitAsync() is being
used. The TestRunner should enable this by default but allow the requirement
for commitAsync() to be disabled by calling something like
{{TestRunner.allowSynchronousCommits()}}. Of course, this would only occur when
users chose to change to 1.14.0 (or whatever version this is released in) of
the Mock Framework. Building against older versions of the Mock Framework would
not expose this behavior.
* If any Processor does still use {{ProcessSession.commit()}} and is run in
Stateless NiFi, the Stateless engine will be required to then trigger the next
Processor in the flow before returning from {{ProcessSession.commit()}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)