Mark Payne created NIFI-14679:
---------------------------------

             Summary: When incrementing a Counter from a Processor, 
TerminatedTaskException always thrown if Processor has been terminated
                 Key: NIFI-14679
                 URL: https://issues.apache.org/jira/browse/NIFI-14679
             Project: Apache NiFi
          Issue Type: Bug
          Components: Core Framework
            Reporter: Mark Payne
            Assignee: Mark Payne


If a Processor calls {{ProcessSession.adjustCounter(name, value)}} (or 
equivalently  {{ProcessSession.adjustCounter(name, value, false)}} ) and the 
Processor has been terminated, it throws a {{{}TerminatedTaskException{}}}, 
which is correct. However, if it calls {{ProcessSession.adjustCounter(name, 
value, true)}} to immediately increment the counter, a 
{{TerminatedTaskException}} is currently thrown when it should not be. The docs 
state:
{code:java}
immediate – if true, the counter will be updated immediately, without regard to 
whether the session is committed or rolled back; otherwise, the counter will be 
incremented only if and when the session is committed. {code}
This is a bit ambiguous, as it does not explicitly discuss what happens if the 
session has been terminated. However, there is at least a weak implication that 
the counter should be incremented, even if the processor has been terminated.

Moreover, the async logic of {{ProcessSession.commitAsync(Runnable, 
Consumer<Throwable>)}} is such that the failure callback will be called in the 
case of a stateless flow timing out, in which case it is feasible that we may 
want to adjust counters. In fact, we do so in ConsumeKafka already.

We need to update the logic such that we throw a {{TerminatedException}} only 
if the {{immediate}} flag is {{{}false{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to