[ 
https://issues.apache.org/jira/browse/NIFI-14679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17985883#comment-17985883
 ] 

ASF subversion and git services commented on NIFI-14679:
--------------------------------------------------------

Commit d86d10fe92c09677e9477cc8be802995f0d27b4f in nifi's branch 
refs/heads/main from Mark Payne
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=d86d10fe92 ]

NIFI-14679: Do not throw TerminatedTaskException from 
ProcessSession.adjustCounter if immediate = true


> When incrementing a Counter from a Processor, TerminatedTaskException always 
> thrown if Processor has been terminated
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: NIFI-14679
>                 URL: https://issues.apache.org/jira/browse/NIFI-14679
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>            Priority: Major
>
> If a Processor calls {{ProcessSession.adjustCounter(name, value)}} (or 
> equivalently  {{ProcessSession.adjustCounter(name, value, false)}} ) and the 
> Processor has been terminated, it throws a {{{}TerminatedTaskException{}}}, 
> which is correct. However, if it calls {{ProcessSession.adjustCounter(name, 
> value, true)}} to immediately increment the counter, a 
> {{TerminatedTaskException}} is currently thrown when it should not be. The 
> docs state:
> {code:java}
> immediate – if true, the counter will be updated immediately, without regard 
> to whether the session is committed or rolled back; otherwise, the counter 
> will be incremented only if and when the session is committed. {code}
> This is a bit ambiguous, as it does not explicitly discuss what happens if 
> the session has been terminated. However, there is at least a weak 
> implication that the counter should be incremented, even if the processor has 
> been terminated.
> Moreover, the async logic of {{ProcessSession.commitAsync(Runnable, 
> Consumer<Throwable>)}} is such that the failure callback will be called in 
> the case of a stateless flow timing out, in which case it is feasible that we 
> may want to adjust counters. In fact, we do so in ConsumeKafka already.
> We need to update the logic such that we throw a {{TerminatedException}} only 
> if the {{immediate}} flag is {{{}false{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to