Branko Peshevski created NIFI-7072:
--------------------------------------

             Summary: ForkRecord in Extract mode fails if concurrency is 
increased.
                 Key: NIFI-7072
                 URL: https://issues.apache.org/jira/browse/NIFI-7072
             Project: Apache NiFi
          Issue Type: Bug
          Components: Core Framework
    Affects Versions: 1.11.0, 1.9.2, 1.10.0
         Environment: Java 8, Macosx,
            Reporter: Branko Peshevski
         Attachments: ForkRecord_concurrency_bug.xml

I have followed the example for ForkRecord from [additional details 
page|https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.9.2/org.apache.nifi.processors.standard.ForkRecord/additionalDetails.html]
 for multi-nested arrays and discovered that if the concurrency is increased 
the processor and the content repository fails.


{code:java}
2020-01-28 18:59:07,485 ERROR [Timer-Driven Process Thread-5] 
o.a.n.c.r.StandardProcessSession Failed to write content to 
StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0];
 rolling back session java.io.IOException: Stream is closed at 
org.apache.nifi.controller.repository.FileSystemRepository$ContentRepositoryOutputStream.write(FileSystemRepository.java:1855)
 at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at 
org.apache.nifi.controller.repository.claim.ContentClaimWriteCache$1.write(ContentClaimWriteCache.java:101)
 at 
org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream.write(DisableOnCloseOutputStream.java:49)
 at 
org.apache.nifi.stream.io.ByteCountingOutputStream.write(ByteCountingOutputStream.java:49)
 at 
org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2636)
 at 
org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62)
 at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at 
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
 at 
org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
 at 
org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244) 
at 
org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94)
 at 
org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74)
 at 
org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91)
 at 
org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319) 
at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315)
 at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283)
 at 
org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238) 
at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
 at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
 at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
 at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) 2020-01-28 18:59:07,485 ERROR 
[Timer-Driven Process Thread-5] o.a.nifi.processors.standard.ForkRecord 
ForkRecord[id=4e463376-2d56-1d09-dbf6-1cf419144e8a] Failed to fork 
StandardFlowFileRecord[uuid=82366dc4-e256-40c4-aec9-d5c681d0ba16,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1580234187165-76713, 
container=default, section=937], offset=487914, 
length=81319],offset=0,name=82366dc4-e256-40c4-aec9-d5c681d0ba16,size=81319]: 
org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write to 
Content Repository for 
StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0]
 org.apache.nifi.processor.exception.FlowFileAccessException: Failed to write 
to Content Repository for 
StandardFlowFileRecord[uuid=d55d9aa0-6b0b-4f6d-aca6-b340467cf29e,claim=,offset=0,name=f15480b3-57da-4e3e-9f5d-5cf2839bc2bd,size=0]
 at 
org.apache.nifi.controller.repository.StandardProcessSession$7.write(StandardProcessSession.java:2641)
 at 
org.apache.nifi.controller.repository.io.TaskTerminationOutputStream.write(TaskTerminationOutputStream.java:62)
 at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at 
org.apache.avro.io.BufferedBinaryEncoder$OutputStreamSink.innerWrite(BufferedBinaryEncoder.java:216)
 at 
org.apache.avro.io.BufferedBinaryEncoder.writeFixed(BufferedBinaryEncoder.java:150)
 at 
org.apache.avro.io.BlockingBinaryEncoder.flush(BlockingBinaryEncoder.java:244) 
at 
org.apache.nifi.avro.WriteAvroResultWithExternalSchema.flush(WriteAvroResultWithExternalSchema.java:94)
 at 
org.apache.nifi.avro.WriteAvroResultWithExternalSchema.onFinishRecordSet(WriteAvroResultWithExternalSchema.java:74)
 at 
org.apache.nifi.serialization.AbstractRecordSetWriter.finishRecordSet(AbstractRecordSetWriter.java:91)
 at 
org.apache.nifi.processors.standard.ForkRecord$1.process(ForkRecord.java:319) 
at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2315)
 at 
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2283)
 at 
org.apache.nifi.processors.standard.ForkRecord.onTrigger(ForkRecord.java:238) 
at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
 at 
org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
 at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
 at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Stream 
is closed
{code}

I have provided a template in the attachment that reproduces the bug. The json 
data from the example was modified so it has more transactions in one of the 
accounts so the bug can be hit easier.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to