Branko Peshevski created NIFI-7072:
--------------------------------------
Summary: ForkRecord in Extract mode fails if concurrency is
increased.
Key: NIFI-7072
URL: https://issues.apache.org/jira/browse/NIFI-7072
Project: Apache NiFi
Issue Type: Bug
Components: Core Framework
Affects Versions: 1.11.0, 1.9.2, 1.10.0
Environment: Java 8, Macosx,
Reporter: Branko Peshevski
Attachments: ForkRecord_concurrency_bug.xml
I have followed the example for ForkRecord from [additional details
page|https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.9.2/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html]
for multi-nested arrays and discovered that if the concurrency is increased
the processor and the content repository fails.
{code:java}
2020-01-28 18:59:07,485 ERROR [Timer-Driven Process Thread-5]
o.a.n.c.r.StandardProcessSession Failed to write content to
StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0];
rolling back session java.io.IOException: Stream is closed at
org.apache.nifi.controller.repository.FileSystemRepository$ContentRepositoryOutputStream.write(FileSystemRepository.java:1855)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at
org.apache.nifi.controller.repository.claim.ContentClaimWriteCache$1.write(ContentClaimWriteCache.java:101)
at
org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:49)
at
org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49)
at
org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2636)
at
org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
at
org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
at
org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244)
at
org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94)
at
org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74)
at
org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91)
at
org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319)
at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315)
at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283)
at
org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
at
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) 2020-01-28 18:59:07,485 ERROR
[Timer-Driven Process Thread-5] o.a.nifi.processors.standard.ForkRecord
ForkRecord[id=4e463376-2d56-1d09-dbf6-1cf419144e8a] Failed to fork
StandardFlowFileRecord[uuid=82366dc4-e256-40c4-aec9-d5c681d0ba16,claim=StandardContentClaim
[resourceClaim=StandardResourceClaim[id=1580234187165-76713,
container=default, section=937], offset=487914,
length=81319],offset=0,name=82366dc4-e256-40c4-aec9-d5c681d0ba16,size=81319]:
org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write to
Content Repository for
StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0]
org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write
to Content Repository for
StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0]
at
org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2641)
at
org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
at
org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
at
org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244)
at
org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94)
at
org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74)
at
org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91)
at
org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319)
at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315)
at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283)
at
org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238)
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
at
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream
is closed
{code}
I have provided a template in the attachment that reproduces the bug. The json
data from the example was modified so it has more transactions in one of the
accounts so the bug can be hit easier.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)