Hi Koji,

Good to know that it can handle large files. I thought it was the case but
I was just not seeing in practice.

Yes I am using 'Line Split Count' as 1 at SplitText.

I added the extra SplitText processor exactly as you suggested and the OOM
went away. So, big thanks!!!

However I have 2 follow-up questions:

1. Before adding the extra SplitText processor I also played with the
back-pressure settings on the outbound queue of the original SplitText
processor, since you mentioned that it is generating files at a rate that
is too high, I figure the queue should slow it down. I tried a limit of
100MB or 1000 files and I still got the OOMs in the SplitText processor.
Why isn't the queue back-pressure helping me in this case? Where would that
come in handy then? Why id the extra SplitText processor needed to fix
things and not just the queue back-pressure?

2. I am now close to completing my flow but I am hitting another error.
This time it's the last stage, the PutHDFS throws
o.apache.nifi.processors.hadoop.PutHDFS
PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to HDFS
due to org.apache.nifi.processor.exception.ProcessException: IOException
thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
See the full stacktrace below.
I have a parallelism of 1 for my PutHDFS processors. Any ideas why this is
happening?

Thanks,
Martin

2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
o.apache.nifi.processors.hadoop.PutHDFS
PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F

ailed to write to HDFS due to
org.apache.nifi.processor.exception.ProcessException: IOException thrown
from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec

5aa]:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to APPEND_FILE /nifi_out/unmatched/log

20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7
because DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
holder.

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)

        at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)

        at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)

        at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j

ava)

        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:422)

        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

: {}

org.apache.nifi.processor.exception.ProcessException: IOException thrown
from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
org.apache.hadoop.ipc.Re

moteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException):
Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for DFSClient_NON

MAPREDUCE_-1411681085_97 on 10.128.0.7 because
DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease holder.

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)

        at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)

        at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)

        at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j

ava)

        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:422)

        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)


        at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2148)

        at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2095)

        at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.java:293)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:360)

        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1678)

        at
org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(PutHDFS.java:223)

        at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)

        at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1118)

        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:144)

        at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)

        at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)

        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE
/nifi_out/unmatched/log20160930.csv for
DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease holder.

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2882)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2683)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2982)

        at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2950)

        at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:655)

        at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:421)

        at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)

        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)

        at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)

        at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)

        at java.security.AccessController.doPrivileged(Native Method)

        at javax.security.auth.Subject.doAs(Subject.java:422)

        at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)

        at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)


        at org.apache.hadoop.ipc.Client.call(Client.java:1475)

        at org.apache.hadoop.ipc.Client.call(Client.java:1412)

        at
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)

        at com.sun.proxy.$Proxy188.append(Unknown Source)

        at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)

        at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown Source)

        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)

        at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)

        at com.sun.proxy.$Proxy194.append(Unknown Source)

        at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)

        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)

        at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)

        at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)

        at
org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)

        at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)

        at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)

        at
org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)

        at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1176)

        at
org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(PutHDFS.java:301)

        at
org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:2125)

        ... 18 common frames omitted

On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <[email protected]>
wrote:

> Hi Martin,
>
> Generally, NiFi processor doesn't load entire content of file and is
> capable of handling huge files.
> However, having massive amount of FlowFiles can cause OOM issue as
> FlowFiles and its Attributes resides on heap.
>
> I assume you are using 'Line Split Count' as 1 at SplitText.
> We recommend to use multiple SplitText processors to not generate many
> FlowFiles in a short period of time.
> For example, 1st SplitText splits files per 5,000 lines, then the 2nd
> SplitText splits into each line.
> This way, we can decrease number of FlowFiles at a given time
> requiring less heap.
>
> I hope this helps.
>
> Thanks,
> Koji
>
> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <[email protected]>
> wrote:
> > Hi all,
> >
> > I have a vanilla Nifi 1.2.0 node with 1GB of heap.
> >
> > The flow I am trying to run is:
> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent -> MergeContent ->
> > PutHDFS
> >
> > When I give it a 300MB input zip file (2.5GB uncompressed) I am getting
> > Java OutOfMemoryError as below.
> >
> > Does NiFi read in the entire contents of files in memory? This is
> > unexpected. I thought it is chunking through files. Giving more ram is
> not
> > a solution as you can always get larger input files in the future.
> >
> > Does this mean NiFi is not suitable as a scalable ETL solution?
> >
> > Can someone please explain what is happening and how to mitigate large
> > files in NiFi? Any patterns?
> >
> > Thanks,
> > M
> >
> > ERROR [Timer-Driven Process Thread-9]
> > o.a.nifi.processors.standard.SplitText
> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to process
> > session due to java.lang.OutOfMemoryError: Java heap space: {}
> >
> > java.lang.OutOfMemoryError: Java heap space
> >
> >         at java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
> >
> >         at java.util.HashMap.putMapEntries(HashMap.java:511)
> >
> >         at java.util.HashMap.<init>(HashMap.java:489)
> >
> >         at
> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
> >
> >         at
> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> Builder.addAttributes(StandardFlowFileRecord.java:234)
> >
> >         at
> > org.apache.nifi.controller.repository.StandardProcessSession.
> putAllAttributes(StandardProcessSession.java:1723)
> >
> >         at
> > org.apache.nifi.processors.standard.SplitText.
> updateAttributes(SplitText.java:367)
> >
> >         at
> > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
> SplitText.java:320)
> >
> >         at
> > org.apache.nifi.processors.standard.SplitText.onTrigger(
> SplitText.java:258)
> >
> >         at
> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> >
> >         at
> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1118)
> >
> >         at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:144)
> >
> >         at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
> >
> >         at
> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> TimerDrivenSchedulingAgent.java:132)
> >
> >         at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >
> >         at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
> >
> >         at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >
> >         at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >
> >         at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >
> >         at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >
> >         at java.lang.Thread.run(Thread.java:748)
>

Reply via email to