[
https://issues.apache.org/jira/browse/NIFI-7072?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17025879#comment-17025879
]
Branko Peshevski commented on NIFI-7072:
----------------------------------------
The BinaryEncoder instances returned by
EncoderFactory.get().blockingBinaryEncoder(OutputStream out, BinaryEncoder
reuse) are not thread-safe.
https://avro.apache.org/docs/1.8.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder)
> ForkRecord in Extract mode fails if concurrency is increased.
> -------------------------------------------------------------
>
> Key: NIFI-7072
> URL: https://issues.apache.org/jira/browse/NIFI-7072
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Affects Versions: 1.10.0, 1.9.2, 1.11.0
> Environment: Java 8, Macosx,
> Reporter: Branko Peshevski
> Priority: Major
> Attachments: ForkRecord_concurrency_bug.xml
>
>
> I have followed the example for ForkRecord from [additional details
> page|https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.9.2/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html]
> for multi-nested arrays and discovered that if the concurrency is increased
> the processor and the content repository fails.
> {code:java}
> 2020-01-28 18:59:07,485 ERROR [Timer-Driven Process Thread-5]
> o.a.n.c.r.StandardProcessSession Failed to write content to
> StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0];
> rolling back session java.io.IOException: Stream is closed at
> org.apache.nifi.controller.repository.FileSystemRepository$ContentRepositoryOutputStream.write(FileSystemRepository.java:1855)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at
> org.apache.nifi.controller.repository.claim.ContentClaimWriteCache$1.write(ContentClaimWriteCache.java:101)
> at
> org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:49)
> at
> org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49)
> at
> org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2636)
> at
> org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at
> org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
> at
> org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
> at
> org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244)
> at
> org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94)
> at
> org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74)
> at
> org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91)
> at
> org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283)
> at
> org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) 2020-01-28 18:59:07,485 ERROR
> [Timer-Driven Process Thread-5] o.a.nifi.processors.standard.ForkRecord
> ForkRecord[id=4e463376-2d56-1d09-dbf6-1cf419144e8a] Failed to fork
> StandardFlowFileRecord[uuid=82366dc4-e256-40c4-aec9-d5c681d0ba16,claim=StandardContentClaim
> [resourceClaim=StandardResourceClaim[id=1580234187165-76713,
> container=default, section=937], offset=487914,
> length=81319],offset=0,name=82366dc4-e256-40c4-aec9-d5c681d0ba16,size=81319]:
> org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write
> to Content Repository for
> StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0]
> org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write
> to Content Repository for
> StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0]
> at
> org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2641)
> at
> org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62)
> at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at
> org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
> at
> org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
> at
> org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244)
> at
> org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94)
> at
> org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74)
> at
> org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91)
> at
> org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315)
> at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283)
> at
> org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238)
> at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
> at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
> at
> org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
> at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
> at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
> java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException:
> Stream is closed
> {code}
> I have provided a template in the attachment that reproduces the bug. The
> json data from the example was modified so it has more transactions in one of
> the accounts so the bug can be hit easier.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)