[
https://issues.apache.org/jira/browse/NIFI-8469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17347647#comment-17347647
]
ASF subversion and git services commented on NIFI-8469:
-------------------------------------------------------
Commit ecacfdaa4c663ff2831dd13a840aabd0ff3349e8 in nifi's branch
refs/heads/main from Mark Payne
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=ecacfda ]
NIFI-8469: Introduced ProcessSession.commitAsync and updated processors to use
it. Deprecated ProcessSession.commit()
- Updated Mock Framework to now fail tests that use ProcessSession.commit()
unless they first call TestRunner.setAllowSynchronousSessionCommits(true)
- Updated stateless nifi in order to make use of async session commits
- Fixed bug that caused stateless to not properly handle Additional Classpath
URLs and bug that caused warnings about validation to get generated when a flow
that used controller services was initialized. While this is not really in
scope of NIFI-8469, it was found when testing and blocked further progress so
addresssed here.
- If Processor fails to progress when run from stateless, trigger from start of
flow until that is no longer the case
- Introduced notion of TransactionThresholds that can limit the amount of data
that a flow will bring in for a given invocation of stateless dataflow
- Several new system-level tests
> Change ProcessSession so that commits are asynchronous
> ------------------------------------------------------
>
> Key: NIFI-8469
> URL: https://issues.apache.org/jira/browse/NIFI-8469
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Core Framework, Extensions
> Reporter: Mark Payne
> Assignee: Mark Payne
> Priority: Critical
> Time Spent: 4h 10m
> Remaining Estimate: 0h
>
> Currently, ProcessSession.commit() guarantees that when the method call
> returns, that all FlowFiles have been persisted to the repositories. As such,
> it is safe to acknowledge/dispose of the data on the external system.
> For example, a processor that consumes from JMS will consume the message,
> create a FlowFile from it, and then call ProcessSession.commit(). Only then
> is it safe to acknowledge the message on the JMS broker. If it is
> acknowledged prior to calling ProcessSession.commit(), and NiFi is restarted,
> the data may be lost. But after calling ProcessSession.commit(), it is safe
> because the content will be available to NiFi upon restart.
> This API has served us well. However, lately there has also been a great deal
> of work and desire from the community to introduce other "runtimes." One of
> those is MiNiFi. Another is Stateless NiFi.
> One of the distinguishing features of Stateless NiFi is that it stores
> content in-memory. This means that fast (and large) disks are not necessary,
> but it also means that upon restarting the application, all FlowFiles are
> lost. In order to provide data reliability, Stateless NiFi requires that the
> source of data be both reliable and replayable (like a JMS Broker or Apache
> Kafka, for instance). In this way, we can keep content in-memory by avoiding
> the message acknowledgment until after we've finished processing the message
> completely. If the application is restarted in the middle, the message will
> not have been acknowledged and as a result will be replayed, so we maintain
> our strong at-least-once guarantees.
> Another distinguishing feature of Stateless NiFi is that it is
> single-threaded. This allows us to be far more scalable and consume few
> resources.
> This works by allowing ProcessSession.commit() to enqueue data for the next
> Processor in the chain and then invoke the next Processor (recursively)
> before ProcessSession.commit() ever returns.
> Unfortunately, though, some dataflows do not work well with such a model. Any
> flow that has MergeContent or MergeRecord in the middle will end up in a
> situation where the Processor never progresses. Take for example, the
> following dataflow:
> GetFile --> SplitText --> ReplaceText --> MergeContent --> PutS3Object
> In this case, assume that SplitText splits an incoming FlowFile into 10
> smaller FlowFiles. ReplaceText performs some manipulation. MergeContent is
> then expected to merge all 10 FlowFiles back into one.
> However, because of the nature of how this works, after SplitText, the queue
> will have 10 FlowFiles. ReplaceText will then be called, which will consume
> one FlowFIle, manipulate it, and call ProcessSession.commit(). This will then
> enqueue the FlowFile for MergeContent. MergeContent will be triggered but
> will be unable to make progress because it doesn't have enough FlowFiles. The
> only choice that the framework has is to then call MergeContent again until
> its entire queue is emptied, but the queue will never empty. As a result, the
> dataflow will end up in an infinite loop, calling MergeContent, which will
> make no progress.
> What we really want to do is to call ReplaceText repeated until its queue is
> empty and only then move on to the next Processor (MergeContent).
> Unfortunately, this can't really be accomplished with the current semantics,
> though. If we tried to do so, when ReplaceText is triggered the first time,
> and it calls ProcessSession.commit(), we would have two choices:
> * Recursively call ReplaceText.onTrigger(). This very quickly results in a
> StackOverflowException, so this approach doesn't work well.
> * Have ProcessSession.commit() block while another thread is responsible for
> calling ReplaceText.onTrigger(). This results in spawning a new thread for
> each FlowFile in the queue, which can very quickly exhaust the number of
> threads, leading to an OutOfMemoryError (or, even worse, depending on
> system/jvm settings, causing the entire operating system to crash).
> So any approach here is not viable.
> Additionally, any dataflow that has a self-loop such as a failure loop has
> the same issue as above, resulting in a StackOverflowException.
> The idea here, then, is to deprecate ProcessSession.commit() in favor of a
> new ProcessSession.commitAsync(). (Perhaps there will be a better name, but
> we'll refer to it as such for the time being). The differentiator here is
> that commitAsync() would allow for an optional callback method to be invoked
> after the session commit completes:
> * void commitAsync();
> * void commitAsync(Runnable successCallback);
> * void commitAsync(Consumer<Throwable> failureCallback);
> * void commitAsync(Runnable successCallback, Consumer<Throwable>
> failureCallback);
> Now, for most Processors, there is no need to call ProcessSession.commit()
> because the abstract parent takes care of it. For those that do call
> ProcessSession.commit(), it is typically because they need to perform some
> cleanup action after the commit, such as in the JMS case illustrated above.
> In this case, the logic would be updated in order to perform the cleanup in
> the callback.
> So, instead of using logic such as:
> {code:java}
> JMSMessage message = jmsConsumer.consume();
> FlowFile flowFile = createFlowFile(message, session);
> session.commit();
> message.acknowledge();{code}
> The logic should be something more akin to:
> {code:java}
> JMSMessage message = jmsConsumer.consume();
> FlowFile flowFile = createFlowFile(message, session);
> session.commitAsync(message:acknowledge); {code}
> In the case of the traditional NiFi engine, void c{{ommitAsync(Runnable
> successCallback)}} would be implemented something like:
> {code:java}
> void commitAsync(Runnable successCallback) {
> commit();
> successCallback.run();
> }{code}
> However, in Stateless (or any other engine that may be developed), it would
> be able to instead simply create a Stack of callbacks, and add the success
> callback to the stack when session.commit() is called.
> When the entire dataflow has completed, only then it will be able to unwind
> the stack of callbacks.
> With this approach, it means that in the dataflow described above,
> ReplaceText can be called continually until its queue is emptied. Then, the
> next Processor can be called continually until its queue is emptied. Even if
> the data has been merged, if the process dies or is restarted, the Success
> Callback of GetFile has not be triggered so the data is still available.
> However, as soon as PutS3Object calls {{ProcessSession.commit()}} (because it
> is the last processor in the chain and it auto-terminates the FlowFile) the
> stack of callbacks can be unwound and called. As a result, GetFile's Success
> Callback is triggered only after successful completion of the entire
> dataflow. If any processor along the way rolls back the session or throws an
> uncaught Exception, the entire session is rolled back, ensuring no data loss.
> There are a few other important considerations to take note of:
> * Because the StandardProcessSession.commitAsync() would behave as described
> above, just calling commit() and then any provided callbacks, the change in
> how traditional NiFi operates is quite minimal and therefore fairly low risk.
> The changes to Stateless NiFi are higher risk but Stateless is generally
> still considered somewhat experimental and expected to evolve pretty heavily.
> * This removes a very large barrier to entry for Stateless NiFi, which is
> that it's very difficult for users to know which flows can and cannot be used
> in Stateless. With these changes, Stateless should be able to run almost any
> flow that traditional NiFi can.
> * This will mean that processors should change to a new API, but for
> probably > 95% of processors, the changes necessary will be trivial.
> * The Mock Framework should be updated to cause a failure on a call to
> ProcessSession.commit() in order to ensure that the new commitAsync() is
> being used. The TestRunner should enable this by default but allow the
> requirement for commitAsync() to be disabled by calling something like
> {{TestRunner.allowSynchronousCommits()}}. Of course, this would only occur
> when users chose to change to 1.14.0 (or whatever version this is released
> in) of the Mock Framework. Building against older versions of the Mock
> Framework would not expose this behavior.
> * If any Processor does still use {{ProcessSession.commit()}} and is run in
> Stateless NiFi, the Stateless engine will be required to then trigger the
> next Processor in the flow before returning from {{ProcessSession.commit()}}.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)