[
https://issues.apache.org/jira/browse/NIFI-3818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Koji Kawamura updated NIFI-3818:
--------------------------------
Description:
PutHive Streaming processor is sometimes throwing IllegalStateException with
following stack trace, resulting in the rollback of the session.
{code}
2017-05-04 03:35:04,190 ERROR [Timer-Driven Process Thread-2]
o.a.n.processors.hive.PutHiveStreaming
PutHiveStreaming[id=d17c8654-015b-1000-0000-0000701816b4]
org.apache.nifi.processors.hive.PutHiveStreaming$$Lambda$1254/2096694862@5bf15592
failed to process due to java.lang.IllegalStateException; rolling back
session: {}
java.lang.IllegalStateException: null
at
org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:210)
at
org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:306)
at
org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:115)
at
org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
at
org.apache.nifi.processors.hive.PutHiveStreaming.onTrigger(PutHiveStreaming.java:582)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1118)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:144)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
This is caused by the incorrect use of ProcessSession at PutHiveStreaming, that
is added by NIFI-3145. The processor reads Avro records from incoming FlowFile,
as well as writing succeeded and failed records to outgoing FlowFiles
(success/fail).
To write outgoing FlowFiles, the processor uses different threads so that it
can append those Avro record without closing the OutputStream passed from
ProcessSession.append method.
However, StandardProcessSession is not thread-safe. It has many HashMaps in it
and particularly, the 'recursionSet' (HashSet) was the cause of this issue, as
different threads add/remove entries from the set, its 'isEmpty' method doesn't
work correctly, and can cause the IllegalStateException reported in this JIRA.
The reason why NIFI-3145 changed the way to append records to outgoing FlowFile
is, that previously (till NiFi 1.1.2) the processor didn't write outgoing
FlowFiles correctly. When additional records are written, the processor added
Avro header to the FlowFile again. Then it won't be able to read as an Avro
file. So NIFI-3145 fixed it by using different threads, but it also generated
this issue.
We need to implement a logic that is single-threaded and properly able to
handle appending Avro records.
was:
PutHive Streaming processor is sometimes throwing IllegalStateException with
following stack trace, resulting in the rollback of the session.
{code}
2017-05-04 03:35:04,190 ERROR [Timer-Driven Process Thread-2]
o.a.n.processors.hive.PutHiveStreaming
PutHiveStreaming[id=d17c8654-015b-1000-0000-0000701816b4]
org.apache.nifi.processors.hive.PutHiveStreaming$$Lambda$1254/2096694862@5bf15592
failed to process due to java.lang.IllegalStateException; rolling back
session: {}
java.lang.IllegalStateException: null
at
org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:210)
at
org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:306)
at
org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:115)
at
org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
at
org.apache.nifi.processors.hive.PutHiveStreaming.onTrigger(PutHiveStreaming.java:582)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1118)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:144)
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}
This is caused by the incorrect use of ProcessSession at PutHiveStreaming, that
is added by NIFI-3145. The processor reads Avro records from incoming FlowFile,
as well as writing succeeded and failed records to outgoing FlowFiles
(success/fail).
To write outgoing FlowFiles, the processor uses different threads so that it
can append those Avro record without closing the OutputStream passed from
ProcessSession.append method.
However, StandardProcessSession is not thread-safe. It has many HashMaps in it
and particularly, the 'recursionSet' (HashSet) was the cause of this issue, as
different threads add/remove entries from the set, its 'isEmpty' method doesn't
work correctly, and can cause the IllegalStateException reported in this JIRA.
> HiveStreaming processor sometimes throws IllegalStateException resulting
> rollback
> ---------------------------------------------------------------------------------
>
> Key: NIFI-3818
> URL: https://issues.apache.org/jira/browse/NIFI-3818
> Project: Apache NiFi
> Issue Type: Bug
> Components: Extensions
> Affects Versions: 1.2.0
> Reporter: Koji Kawamura
> Assignee: Koji Kawamura
>
> PutHive Streaming processor is sometimes throwing IllegalStateException with
> following stack trace, resulting in the rollback of the session.
> {code}
> 2017-05-04 03:35:04,190 ERROR [Timer-Driven Process Thread-2]
> o.a.n.processors.hive.PutHiveStreaming
> PutHiveStreaming[id=d17c8654-015b-1000-0000-0000701816b4]
> org.apache.nifi.processors.hive.PutHiveStreaming$$Lambda$1254/2096694862@5bf15592
> failed to process due to java.lang.IllegalStateException; rolling back
> session: {}
> java.lang.IllegalStateException: null
> at
> org.apache.nifi.controller.repository.StandardProcessSession.checkpoint(StandardProcessSession.java:210)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.commit(StandardProcessSession.java:306)
> at
> org.apache.nifi.processor.util.pattern.PartialFunctions.onTrigger(PartialFunctions.java:115)
> at
> org.apache.nifi.processor.util.pattern.RollbackOnFailure.onTrigger(RollbackOnFailure.java:184)
> at
> org.apache.nifi.processors.hive.PutHiveStreaming.onTrigger(PutHiveStreaming.java:582)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1118)
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:144)
> at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> This is caused by the incorrect use of ProcessSession at PutHiveStreaming,
> that is added by NIFI-3145. The processor reads Avro records from incoming
> FlowFile, as well as writing succeeded and failed records to outgoing
> FlowFiles (success/fail).
> To write outgoing FlowFiles, the processor uses different threads so that it
> can append those Avro record without closing the OutputStream passed from
> ProcessSession.append method.
> However, StandardProcessSession is not thread-safe. It has many HashMaps in
> it and particularly, the 'recursionSet' (HashSet) was the cause of this
> issue, as different threads add/remove entries from the set, its 'isEmpty'
> method doesn't work correctly, and can cause the IllegalStateException
> reported in this JIRA.
> The reason why NIFI-3145 changed the way to append records to outgoing
> FlowFile is, that previously (till NiFi 1.1.2) the processor didn't write
> outgoing FlowFiles correctly. When additional records are written, the
> processor added Avro header to the FlowFile again. Then it won't be able to
> read as an Avro file. So NIFI-3145 fixed it by using different threads, but
> it also generated this issue.
> We need to implement a logic that is single-threaded and properly able to
> handle appending Avro records.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)