[ 
https://issues.apache.org/jira/browse/NIFI-11106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Doran updated NIFI-11106:
-------------------------------
    Attachment:     (was: nifi-11106-sample-data.txt)

> QueryRecord can fail to rollback sessions on commit failure.
> ------------------------------------------------------------
>
>                 Key: NIFI-11106
>                 URL: https://issues.apache.org/jira/browse/NIFI-11106
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.14.0, 1.13.2, 1.15.3, 1.17.0, 1.16.3, 1.18.0, 1.19.1
>            Reporter: Kevin Doran
>            Priority: Blocker
>         Attachments: query-record-config.json.txt
>
>
> It appears that going back to at least NiFi 1.13.2, there are some 
> configurations of QueryRecord that can result flowfiles to be not closed 
> properly prior to attempting to commit a session. This causes session commit 
> to fail, as a a flow file cannot be written while it is still open for 
> reading. These show up as uncaught exceptions in NiFi logs (see below).
> Additionally, failed session commits should be rolled back and reprocessed, 
> however, some of these QueryRecord commit failures do not get rolled back 
> properly. When this occurs, the content of the flowfile is stuck, uncommitted 
> so it will not reach downstream processors and not rolled back so that it 
> will land back in the QueryRecord input queue to be reprocessed. This can 
> result in data loss for the contents of the stuck flowfile.
> Specifically this has been observed with QueryRecord, GrokReader, and 
> FreeFormTextRecordSetWriter.
> *Attached is an example configuration of QueryRecord (including record 
> reader/writer) that can result in this behavior.*
> During the time observed, data volume was about 40-78 million records per 
> hour, with the uncaught exceptions only occasionally occurring (between 0 and 
> 10 times per hour). Of those uncaught exceptions, not all resulting in data 
> loss / failed rollback, so that is even more rare. 
> *Data characteristics:*
> * For the given QueryRecord config that has include and exclude queries, both 
> queries are expected to match some lines in the input flowfile.
> * The flowfile is also expected to contain some unmatched lines, such as 
> empty lines, lines of text that don't match either query, or corrupted / 
> binary lines that don't decode to ascii intermixed with plaintext lines.
> Example data can be provided upon request.
> Here are the stack traces for how this issue appears on NiFi 1.16.1:
> {noformat}
> 2023-01-19T15:22:58.798926992Z stdout F 2023-01-19 15:22:58,797 ERROR 
> [Timer-Driven Process Thread-5] o.a.nifi.processors.standard.QueryRecord 
> QueryRecord[id=98518d51-88d8-3000-a38e-d629589cfd5a] Processing halted: 
> yielding [1 sec]
> 2023-01-19T15:22:58.798933858Z stdout F java.lang.IllegalStateException: 
> Cannot commit session while reading from FlowFile
> 2023-01-19T15:22:58.798940864Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.validateCommitState(StandardProcessSession.java:241)
> 2023-01-19T15:22:58.798957184Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:270)
> 2023-01-19T15:22:58.798960616Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:233)
> 2023-01-19T15:22:58.798963703Z stdout F       at 
> org.apache.nifi.controller.repository.BatchingSessionFactory$HighThroughputSession.commit(BatchingSessionFactory.java:69)
> 2023-01-19T15:22:58.798967659Z stdout F       at 
> org.apache.nifi.controller.repository.BatchingSessionFactory$HighThroughputSession.commitAsync(BatchingSessionFactory.java:74)
> 2023-01-19T15:22:58.798970816Z stdout F       at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
> 2023-01-19T15:22:58.798974237Z stdout F       at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1354)
> 2023-01-19T15:22:58.798978213Z stdout F       at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
> 2023-01-19T15:22:58.79899325Z stdout F        at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103)
> 2023-01-19T15:22:58.798996758Z stdout F       at 
> org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> 2023-01-19T15:22:58.799000774Z stdout F       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2023-01-19T15:22:58.799004017Z stdout F       at 
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> 2023-01-19T15:22:58.799007704Z stdout F       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> 2023-01-19T15:22:58.799011065Z stdout F       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> 2023-01-19T15:22:58.799014058Z stdout F       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2023-01-19T15:22:58.79901729Z stdout F        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2023-01-19T15:22:58.799022956Z stdout F       at 
> java.lang.Thread.run(Thread.java:750)
> 2023-01-19T15:22:58.799026908Z stdout F 2023-01-19 15:22:58,798 WARN 
> [Timer-Driven Process Thread-5] o.a.n.controller.tasks.ConnectableTask 
> Processing halted: uncaught exception in Component 
> [QueryRecord[id=98518d51-88d8-3000-a38e-d629589cfd5a]]
> 2023-01-19T15:22:58.799031354Z stdout F java.lang.IllegalStateException: 
> Cannot commit session while reading from FlowFile
> 2023-01-19T15:22:58.799034326Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.validateCommitState(StandardProcessSession.java:241)
> 2023-01-19T15:22:58.799037922Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:270)
> 2023-01-19T15:22:58.799041102Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:233)
> 2023-01-19T15:22:58.799044444Z stdout F       at 
> org.apache.nifi.controller.repository.BatchingSessionFactory$HighThroughputSession.commit(BatchingSessionFactory.java:69)
> 2023-01-19T15:22:58.799052944Z stdout F       at 
> org.apache.nifi.controller.repository.BatchingSessionFactory$HighThroughputSession.commitAsync(BatchingSessionFactory.java:74)
> 2023-01-19T15:22:58.799062978Z stdout F       at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
> 2023-01-19T15:22:58.799068448Z stdout F       at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1354)
> 2023-01-19T15:22:58.799071408Z stdout F       at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
> 2023-01-19T15:22:58.799074734Z stdout F       at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103)
> 2023-01-19T15:22:58.799077893Z stdout F       at 
> org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> 2023-01-19T15:22:58.799141969Z stdout F       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2023-01-19T15:22:58.799151867Z stdout F       at 
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> 2023-01-19T15:22:58.799158862Z stdout F       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> 2023-01-19T15:22:58.799162739Z stdout F       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> 2023-01-19T15:22:58.799166604Z stdout F       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2023-01-19T15:22:58.799169817Z stdout F       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2023-01-19T15:22:58.799173215Z stdout F       at 
> java.lang.Thread.run(Thread.java:750)
> 2023-01-19T15:22:58.799176284Z stdout F 2023-01-19 15:22:58,798 ERROR 
> [Timer-Driven Process Thread-5] o.a.n.c.r.StandardProcessSession Failed to 
> asynchronously commit session StandardProcessSession[id=207236052] for 
> QueryRecord[id=98518d51-88d8-3000-a38e-d629589cfd5a]
> 2023-01-19T15:22:58.799182572Z stdout F java.lang.IllegalStateException: 
> Cannot commit session while reading from FlowFile
> 2023-01-19T15:22:58.799223279Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.validateCommitState(StandardProcessSession.java:241)
> 2023-01-19T15:22:58.799227845Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:270)
> 2023-01-19T15:22:58.799240291Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:547)
> 2023-01-19T15:22:58.799243912Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.commitAsync(StandardProcessSession.java:518)
> 2023-01-19T15:22:58.799247042Z stdout F       at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:263)
> 2023-01-19T15:22:58.799250482Z stdout F       at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103)
> 2023-01-19T15:22:58.799253892Z stdout F       at 
> org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> 2023-01-19T15:22:58.799257325Z stdout F       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2023-01-19T15:22:58.799269651Z stdout F       at 
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> 2023-01-19T15:22:58.799273432Z stdout F       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> 2023-01-19T15:22:58.799276577Z stdout F       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> 2023-01-19T15:22:58.79927986Z stdout F        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2023-01-19T15:22:58.799283817Z stdout F       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2023-01-19T15:22:58.799287269Z stdout F       at 
> java.lang.Thread.run(Thread.java:750)
> 2023-01-19T15:22:58.799291372Z stdout F 2023-01-19 15:22:58,798 ERROR 
> [Timer-Driven Process Thread-5] o.a.nifi.processors.standard.QueryRecord 
> [QueryRecord[id=98518d51-88d8-3000-a38e-d629589cfd5a], 
> StandardProcessSession[id=207236052], java.lang.IllegalStateException: Cannot 
> commit session while reading from FlowFile] Failed to commit session {} due 
> to {}; rolling back
> 2023-01-19T15:22:58.799295329Z stdout F java.lang.IllegalStateException: 
> Cannot commit session while reading from FlowFile
> 2023-01-19T15:22:58.799301357Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.validateCommitState(StandardProcessSession.java:241)
> 2023-01-19T15:22:58.799304456Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:270)
> 2023-01-19T15:22:58.799308659Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:547)
> 2023-01-19T15:22:58.799311987Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.commitAsync(StandardProcessSession.java:518)
> 2023-01-19T15:22:58.799319288Z stdout F       at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:263)
> 2023-01-19T15:22:58.799322307Z stdout F       at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103)
> 2023-01-19T15:22:58.799325425Z stdout F       at 
> org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> 2023-01-19T15:22:58.799328575Z stdout F       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2023-01-19T15:22:58.799333103Z stdout F       at 
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> 2023-01-19T15:22:58.799336563Z stdout F       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> 2023-01-19T15:22:58.799339581Z stdout F       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> 2023-01-19T15:22:58.799358282Z stdout F       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2023-01-19T15:22:58.799361433Z stdout F       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2023-01-19T15:22:58.799364528Z stdout F       at 
> java.lang.Thread.run(Thread.java:750)
> 2023-01-19T15:22:58.7993679Z stdout F 2023-01-19 15:22:58,798 ERROR 
> [Timer-Driven Process Thread-5] org.apache.nifi.engine.FlowEngine Uncaught 
> Exception in Runnable task
> 2023-01-19T15:22:58.799373562Z stdout F java.lang.IllegalStateException: 
> Cannot commit session while reading from FlowFile
> 2023-01-19T15:23:04.67041632Z stdout F        at 
> org.apache.nifi.controller.repository.StandardProcessSession.validateCommitState(StandardProcessSession.java:241)
> 2023-01-19T15:23:04.670436894Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:270)
> 2023-01-19T15:23:04.670440419Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:547)
> 2023-01-19T15:23:04.670443721Z stdout F       at 
> org.apache.nifi.controller.repository.StandardProcessSession.commitAsync(StandardProcessSession.java:518)
> 2023-01-19T15:23:04.670446305Z stdout F       at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:263)
> 2023-01-19T15:23:04.670449314Z stdout F       at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103)
> 2023-01-19T15:23:04.670451763Z stdout F       at 
> org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
> 2023-01-19T15:23:04.670455784Z stdout F       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 2023-01-19T15:23:04.670460754Z stdout F       at 
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> 2023-01-19T15:23:04.670463082Z stdout F       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> 2023-01-19T15:23:04.670466555Z stdout F       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> 2023-01-19T15:23:04.670468728Z stdout F       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 2023-01-19T15:23:04.6704711Z stdout F         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 2023-01-19T15:23:04.670473527Z stdout F       at 
> java.lang.Thread.run(Thread.java:750)
> {noformat}
> Here is the same or similar issue in NiFi 1.13.2
> {noformat}
> 2022-12-30 23:01:34,002 ERROR 
> org.apache.nifi.processors.standard.QueryRecord: 
> QueryRecord[id=f33a33f8-a74d-3eb9-b3fe-ae68197c9253] 
> QueryRecord[id=f33a33f8-a74d-3eb9-b3fe-ae68197c9253] failed to process 
> session due to java.lang.IllegalStateException; Processor Administratively 
> Yielded for 1 sec: java.lang.IllegalStateException
> java.lang.IllegalStateException: null
>       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:231)
>       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:219)
>       at 
> org.apache.nifi.controller.repository.BatchingSessionFactory$HighThroughputSession.commit(BatchingSessionFactory.java:65)
>       at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
>       at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
>       at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
>       at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103)
>       at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> 2022-12-30 23:01:34,002 WARN 
> org.apache.nifi.controller.tasks.ConnectableTask: Administratively Yielding 
> QueryRecord[id=f33a33f8-a74d-3eb9-b3fe-ae68197c9253] due to uncaught 
> Exception: java.lang.IllegalStateException
> java.lang.IllegalStateException: null
>       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:231)
>       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:219)
>       at 
> org.apache.nifi.controller.repository.BatchingSessionFactory$HighThroughputSession.commit(BatchingSessionFactory.java:65)
>       at 
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:28)
>       at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1173)
>       at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:214)
>       at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103)
>       at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> 2022-12-30 23:01:34,002 ERROR 
> org.apache.nifi.processors.standard.QueryRecord: 
> QueryRecord[id=f33a33f8-a74d-3eb9-b3fe-ae68197c9253] Failed to commit session 
> StandardProcessSession[id=676435539744] due to 
> java.lang.IllegalStateException; rolling back: java.lang.IllegalStateException
> java.lang.IllegalStateException: null
>       at 
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:231)
>       at 
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:333)
>       at 
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:264)
>       at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:103)
>       at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:750)
> {noformat}
> cc [~exceptionfactory] [~markap14]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to