Split failed before even with backpressure:
- yes that backpressure kicks in when destination queues for a given
processor have reached their target size (in count of flowfiles or
total size represented).  However, to clarify why the OOM happened it
is important to realize that it is not about 'flow files over a quick
period of time' but rather 'flow files held within a single process
session.  Your SplitText was pulling a single flowfile but then
creating lets say 1,000,000 resulting flow files and then committing
that change.  That happens within a session.  But all those flow file
objects (not their content) are held in memory and at such high
numbers it creates excessive heap usage.  The two phase divide/conquer
approach Koji suggested solves that and eventually we need to solve
that by swapping out the flowfiles to disk within a session.  We
actually do swap out flowfiles sitting on queues after a certain
threshold is reached for this very reason.  This means you should be
able to have many millions of flowfiles sitting around in the flow for
whatever reason and not hit memory problems.

Hope that helps there.

On PutHDFS it looks like possibly two things are trying to append to
the same file?  If yes I'd really recommend not appending but rather
use MergeContent to create data bundles of a given size then write
those to HDFS.

Thanks
Joe

On Wed, May 31, 2017 at 10:33 AM, Martin Eden <[email protected]> wrote:
> Hi Koji,
>
> Good to know that it can handle large files. I thought it was the case but
> I was just not seeing in practice.
>
> Yes I am using 'Line Split Count' as 1 at SplitText.
>
> I added the extra SplitText processor exactly as you suggested and the OOM
> went away. So, big thanks!!!
>
> However I have 2 follow-up questions:
>
> 1. Before adding the extra SplitText processor I also played with the
> back-pressure settings on the outbound queue of the original SplitText
> processor, since you mentioned that it is generating files at a rate that
> is too high, I figure the queue should slow it down. I tried a limit of
> 100MB or 1000 files and I still got the OOMs in the SplitText processor.
> Why isn't the queue back-pressure helping me in this case? Where would that
> come in handy then? Why id the extra SplitText processor needed to fix
> things and not just the queue back-pressure?
>
> 2. I am now close to completing my flow but I am hitting another error.
> This time it's the last stage, the PutHDFS throws
> o.apache.nifi.processors.hadoop.PutHDFS
> PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to HDFS
> due to org.apache.nifi.processor.exception.ProcessException: IOException
> thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> See the full stacktrace below.
> I have a parallelism of 1 for my PutHDFS processors. Any ideas why this is
> happening?
>
> Thanks,
> Martin
>
> 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
> o.apache.nifi.processors.hadoop.PutHDFS
> PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F
>
> ailed to write to HDFS due to
> org.apache.nifi.processor.exception.ProcessException: IOException thrown
> from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
>
> 5aa]:
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
> Failed to APPEND_FILE /nifi_out/unmatched/log
>
> 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7
> because DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> holder.
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)
>
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j
>
> ava)
>
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
> : {}
>
> org.apache.nifi.processor.exception.ProcessException: IOException thrown
> from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
> org.apache.hadoop.ipc.Re
>
> moteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
> Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for DFSClient_NON
>
> MAPREDUCE_-1411681085_97 on 10.128.0.7 because
> DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease holder.
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)
>
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j
>
> ava)
>
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
>
>         at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2148)
>
>         at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2095)
>
>         at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.java:293)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:360)
>
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1678)
>
>         at
> org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(PutHDFS.java:223)
>
>         at
> org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>
>         at
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1118)
>
>         at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:144)
>
>         at
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>
>         at
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE
> /nifi_out/unmatched/log20160930.csv for
> DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
> DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease holder.
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)
>
>         at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)
>
>         at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
>
>         at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
>
>         at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
>
>         at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
>
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1475)
>
>         at org.apache.hadoop.ipc.Client.call(Client.java:1412)
>
>         at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
>
>         at com.sun.proxy.$Proxy188.append(Unknown Source)
>
>         at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
>
>         at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown Source)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:498)
>
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>
>         at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>
>         at com.sun.proxy.$Proxy194.append(Unknown Source)
>
>         at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)
>
>         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
>
>         at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
>
>         at
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)
>
>         at
> org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)
>
>         at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1176)
>
>         at
> org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(PutHDFS.java:301)
>
>         at
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2125)
>
>         ... 18 common frames omitted
>
> On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <[email protected]>
> wrote:
>
>> Hi Martin,
>>
>> Generally, NiFi processor doesn't load entire content of file and is
>> capable of handling huge files.
>> However, having massive amount of FlowFiles can cause OOM issue as
>> FlowFiles and its Attributes resides on heap.
>>
>> I assume you are using 'Line Split Count' as 1 at SplitText.
>> We recommend to use multiple SplitText processors to not generate many
>> FlowFiles in a short period of time.
>> For example, 1st SplitText splits files per 5,000 lines, then the 2nd
>> SplitText splits into each line.
>> This way, we can decrease number of FlowFiles at a given time
>> requiring less heap.
>>
>> I hope this helps.
>>
>> Thanks,
>> Koji
>>
>> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <[email protected]>
>> wrote:
>> > Hi all,
>> >
>> > I have a vanilla Nifi 1.2.0 node with 1GB of heap.
>> >
>> > The flow I am trying to run is:
>> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent -> MergeContent ->
>> > PutHDFS
>> >
>> > When I give it a 300MB input zip file (2.5GB uncompressed) I am getting
>> > Java OutOfMemoryError as below.
>> >
>> > Does NiFi read in the entire contents of files in memory? This is
>> > unexpected. I thought it is chunking through files. Giving more ram is
>> not
>> > a solution as you can always get larger input files in the future.
>> >
>> > Does this mean NiFi is not suitable as a scalable ETL solution?
>> >
>> > Can someone please explain what is happening and how to mitigate large
>> > files in NiFi? Any patterns?
>> >
>> > Thanks,
>> > M
>> >
>> > ERROR [Timer-Driven Process Thread-9]
>> > o.a.nifi.processors.standard.SplitText
>> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
>> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to process
>> > session due to java.lang.OutOfMemoryError: Java heap space: {}
>> >
>> > java.lang.OutOfMemoryError: Java heap space
>> >
>> >         at java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
>> >
>> >         at java.util.HashMap.putMapEntries(HashMap.java:511)
>> >
>> >         at java.util.HashMap.<init>(HashMap.java:489)
>> >
>> >         at
>> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
>> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
>> >
>> >         at
>> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
>> Builder.addAttributes(StandardFlowFileRecord.java:234)
>> >
>> >         at
>> > org.apache.nifi.controller.repository.StandardProcessSession.
>> putAllAttributes(StandardProcessSession.java:1723)
>> >
>> >         at
>> > org.apache.nifi.processors.standard.SplitText.
>> updateAttributes(SplitText.java:367)
>> >
>> >         at
>> > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
>> SplitText.java:320)
>> >
>> >         at
>> > org.apache.nifi.processors.standard.SplitText.onTrigger(
>> SplitText.java:258)
>> >
>> >         at
>> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
>> AbstractProcessor.java:27)
>> >
>> >         at
>> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
>> StandardProcessorNode.java:1118)
>> >
>> >         at
>> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>> ContinuallyRunProcessorTask.java:144)
>> >
>> >         at
>> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
>> ContinuallyRunProcessorTask.java:47)
>> >
>> >         at
>> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
>> TimerDrivenSchedulingAgent.java:132)
>> >
>> >         at
>> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> >
>> >         at java.util.concurrent.FutureTask.runAndReset(
>> FutureTask.java:308)
>> >
>> >         at
>> > java.util.concurrent.ScheduledThreadPoolExecutor$
>> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>> >
>> >         at
>> > java.util.concurrent.ScheduledThreadPoolExecutor$
>> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>> >
>> >         at
>> > java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>> >
>> >         at
>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>> >
>> >         at java.lang.Thread.run(Thread.java:748)
>>

Reply via email to