Mark Payne created NIFI-14679:
---------------------------------
Summary: When incrementing a Counter from a Processor,
TerminatedTaskException always thrown if Processor has been terminated
Key: NIFI-14679
URL: https://issues.apache.org/jira/browse/NIFI-14679
Project: Apache NiFi
Issue Type: Bug
Components: Core Framework
Reporter: Mark Payne
Assignee: Mark Payne
If a Processor calls {{ProcessSession.adjustCounter(name, value)}} (or
equivalently {{ProcessSession.adjustCounter(name, value, false)}} ) and the
Processor has been terminated, it throws a {{{}TerminatedTaskException{}}},
which is correct. However, if it calls {{ProcessSession.adjustCounter(name,
value, true)}} to immediately increment the counter, a
{{TerminatedTaskException}} is currently thrown when it should not be. The docs
state:
{code:java}
immediate – if true, the counter will be updated immediately, without regard to
whether the session is committed or rolled back; otherwise, the counter will be
incremented only if and when the session is committed. {code}
This is a bit ambiguous, as it does not explicitly discuss what happens if the
session has been terminated. However, there is at least a weak implication that
the counter should be incremented, even if the processor has been terminated.
Moreover, the async logic of {{ProcessSession.commitAsync(Runnable,
Consumer<Throwable>)}} is such that the failure callback will be called in the
case of a stateless flow timing out, in which case it is feasible that we may
want to adjust counters. In fact, we do so in ConsumeKafka already.
We need to update the logic such that we throw a {{TerminatedException}} only
if the {{immediate}} flag is {{{}false{}}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)