Branko Peshevski created NIFI-7072: -------------------------------------- Summary: ForkRecord in Extract mode fails if concurrency is increased. Key: NIFI-7072 URL: https://issues.apache.org/jira/browse/NIFI-7072 Project: Apache NiFi Issue Type: Bug Components: Core Framework Affects Versions: 1.11.0, 1.9.2, 1.10.0 Environment: Java 8, Macosx, Reporter: Branko Peshevski Attachments: ForkRecord_concurrency_bug.xml
I have followed the example for ForkRecord from [additional details page|https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.9.2/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html] for multi-nested arrays and discovered that if the concurrency is increased the processor and the content repository fails. {code:java} 2020-01-28 18:59:07,485 ERROR [Timer-Driven Process Thread-5] o.a.n.c.r.StandardProcessSession Failed to write content to StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0]; rolling back session java.io.IOException: Stream is closed at org.apache.nifi.controller.repository.FileSystemRepository$ContentRepositoryOutputStream.write(FileSystemRepository.java:1855) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.apache.nifi.controller.repository.claim.ContentClaimWriteCache$1.write(ContentClaimWriteCache.java:101) at org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:49) at org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49) at org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2636) at org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216) at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150) at org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244) at org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94) at org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74) at org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91) at org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319) at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315) at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283) at org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2020-01-28 18:59:07,485 ERROR [Timer-Driven Process Thread-5] o.a.nifi.processors.standard.ForkRecord ForkRecord[id=4e463376-2d56-1d09-dbf6-1cf419144e8a] Failed to fork StandardFlowFileRecord[uuid=82366dc4-e256-40c4-aec9-d5c681d0ba16,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1580234187165-76713, container=default, section=937], offset=487914, length=81319],offset=0,name=82366dc4-e256-40c4-aec9-d5c681d0ba16,size=81319]: org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write to Content Repository for StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0] org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write to Content Repository for StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0] at org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2641) at org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216) at org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150) at org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244) at org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94) at org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74) at org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91) at org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319) at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315) at org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283) at org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream is closed {code} I have provided a template in the attachment that reproduces the bug. The json data from the example was modified so it has more transactions in one of the accounts so the bug can be hit easier. -- This message was sent by Atlassian Jira (v8.3.4#803005)