Hi Joe,
Thanks for the explanations. Really useful in understanding how it works.
Good to know that in the future this will be improved.
About the appending to HDFS issue let me recap. My flow is:
ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1)
-> RouteOnContent -> MergeContent -> PutHDFS -> hdfs://dir1/f.csv
-> MergeContent -> PutHDFS -> hdfs://dir2/f.csv
-> MergeContent -> PutHDFS -> hdfs://dir3/f.csv
ListHDFS is monitoring an input folder where 300MB zip files are added
periodically. Each file uncompressed is 2.5 GB csv.
So I am writing out to hdfs from multiple PutHDFS processors all of them
having conflict resolution set to *APPEND* and different output folders.
The name of the file will be however the same *f.csv*. It gets picked up
from the name of the flow files which bear the name of the original
uncompressed file. This happens I think in the MergeContent processor.
Since all of these processors are running with 1 concurrent task, it seems
that we cannot append concurrently to hdfs even if we are appending to
different files in different folders for some reason. Any ideas how to
mitigate this?
It seems other people have encountered this
<https://community.hortonworks.com/questions/61096/puthdfs-leaseexpiredexception-error-when-running-m.html>
with NiFi but there is no conclusive solution. It does seem also that
appending to hdfs is somewhat problematic
<http://community.cloudera.com/t5/Storage-Random-Access-HDFS/How-to-append-files-to-HDFS-with-Java-quot-current-leaseholder/td-p/41369>
.
So stepping back, the reason I am doing append in the PutHDFS is because I
did not manage to find a setting in the MergeContent processors that
basically allows creation of multiple bundled flow files with the same root
name but different sequence numbers or timestamps (like f.csv.1, f.csv.2
....). They all get the same name which is f.csv. Is that possible somehow?
See my detailed MergeContent processor config below.
So basically I have a 2.5GB csv file that eventually gets broken up in
lines and the lines gets merged together in bundles of 10 MB but when those
bundles are emitted to the PutHDFS they have the same name as the original
file over and over again. I would like them to have a different name based
on a timestamp or sequence number let's say so that I can avoid the append
conflict resolution in PutHDFS which is causing me grief right now. Is that
possible?
Thanks,
M
Currently my MergeContent processor config is:
<properties>
* <entry> <key>Merge Strategy</key> <value>Bin-Packing Algorithm</value>
</entry>*
* <entry> <key>Merge Format</key> <value>Binary Concatenation</value>
</entry>*
<entry> <key>Attribute Strategy</key><value>Keep Only Common
Attributes</value> </entry>
<entry> <key>Correlation Attribute Name</key> </entry>
<entry> <key>Minimum Number of Entries</key><value>1</value> </entry>
<entry> <key>Maximum Number of Entries</key> <value>1000</value> </entry>
<entry> <key>Minimum Group Size</key> <value>0 B</value> </entry>
* <entry> <key>Maximum Group Size</key> <value>10 MB</value> </entry>*
<entry> <key>Max Bin Age</key> </entry>
<entry> <key>Maximum number of Bins</key> <value>5</value> </entry>
<entry> <key>Delimiter Strategy</key><value>Text</value> </entry>
<entry> <key>Header File</key> </entry>
<entry> <key>Footer File</key> </entry>
<entry> <key>Demarcator File</key> <value></value> </entry>
<entry> <key>Compression Level</key> <value>1</value></entry>
<entry> <key>Keep Path</key> <value>false</value> </entry>
</properties>
On Wed, May 31, 2017 at 3:52 PM, Joe Witt <[email protected]> wrote:
> Split failed before even with backpressure:
> - yes that backpressure kicks in when destination queues for a given
> processor have reached their target size (in count of flowfiles or
> total size represented). However, to clarify why the OOM happened it
> is important to realize that it is not about 'flow files over a quick
> period of time' but rather 'flow files held within a single process
> session. Your SplitText was pulling a single flowfile but then
> creating lets say 1,000,000 resulting flow files and then committing
> that change. That happens within a session. But all those flow file
> objects (not their content) are held in memory and at such high
> numbers it creates excessive heap usage. The two phase divide/conquer
> approach Koji suggested solves that and eventually we need to solve
> that by swapping out the flowfiles to disk within a session. We
> actually do swap out flowfiles sitting on queues after a certain
> threshold is reached for this very reason. This means you should be
> able to have many millions of flowfiles sitting around in the flow for
> whatever reason and not hit memory problems.
>
> Hope that helps there.
>
> On PutHDFS it looks like possibly two things are trying to append to
> the same file? If yes I'd really recommend not appending but rather
> use MergeContent to create data bundles of a given size then write
> those to HDFS.
>
> Thanks
> Joe
>
> On Wed, May 31, 2017 at 10:33 AM, Martin Eden <[email protected]>
> wrote:
> > Hi Koji,
> >
> > Good to know that it can handle large files. I thought it was the case
> but
> > I was just not seeing in practice.
> >
> > Yes I am using 'Line Split Count' as 1 at SplitText.
> >
> > I added the extra SplitText processor exactly as you suggested and the
> OOM
> > went away. So, big thanks!!!
> >
> > However I have 2 follow-up questions:
> >
> > 1. Before adding the extra SplitText processor I also played with the
> > back-pressure settings on the outbound queue of the original SplitText
> > processor, since you mentioned that it is generating files at a rate that
> > is too high, I figure the queue should slow it down. I tried a limit of
> > 100MB or 1000 files and I still got the OOMs in the SplitText processor.
> > Why isn't the queue back-pressure helping me in this case? Where would
> that
> > come in handy then? Why id the extra SplitText processor needed to fix
> > things and not just the queue back-pressure?
> >
> > 2. I am now close to completing my flow but I am hitting another error.
> > This time it's the last stage, the PutHDFS throws
> > o.apache.nifi.processors.hadoop.PutHDFS
> > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to HDFS
> > due to org.apache.nifi.processor.exception.ProcessException: IOException
> > thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> > See the full stacktrace below.
> > I have a parallelism of 1 for my PutHDFS processors. Any ideas why this
> is
> > happening?
> >
> > Thanks,
> > Martin
> >
> > 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5]
> > o.apache.nifi.processors.hadoop.PutHDFS
> > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F
> >
> > ailed to write to HDFS due to
> > org.apache.nifi.processor.exception.ProcessException: IOException thrown
> > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec
> >
> > 5aa]:
> > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.
> AlreadyBeingCreatedException):
> > Failed to APPEND_FILE /nifi_out/unmatched/log
> >
> > 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7
> > because DFSClient_NONMAPREDUCE_-1411681085_97 is already the current
> lease
> > holder.
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> recoverLeaseInternal(FSNamesystem.java:2882)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(
> FSNamesystem.java:2683)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInt(FSNamesystem.java:2982)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFile(FSNamesystem.java:2950)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> append(NameNodeRpcServer.java:655)
> >
> > at
> > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> deTranslatorPB.append(ClientNamenodeProtocolServerSi
> deTranslatorPB.java:421)
> >
> > at
> > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j
> >
> > ava)
> >
> > at
> > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> ProtobufRpcEngine.java:616)
> >
> > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> >
> > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> >
> > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> >
> > at java.security.AccessController.doPrivileged(Native Method)
> >
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> > at
> > org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1698)
> >
> > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
> >
> > : {}
> >
> > org.apache.nifi.processor.exception.ProcessException: IOException thrown
> > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]:
> > org.apache.hadoop.ipc.Re
> >
> > moteException(org.apache.hadoop.hdfs.protocol.
> AlreadyBeingCreatedException):
> > Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for
> DFSClient_NON
> >
> > MAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> holder.
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> recoverLeaseInternal(FSNamesystem.java:2882)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(
> FSNamesystem.java:2683)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInt(FSNamesystem.java:2982)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFile(FSNamesystem.java:2950)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> append(NameNodeRpcServer.java:655)
> >
> > at
> > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> deTranslatorPB.append(ClientNamenodeProtocolServerSi
> deTranslatorPB.java:421)
> >
> > at
> > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.j
> >
> > ava)
> >
> > at
> > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> ProtobufRpcEngine.java:616)
> >
> > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> >
> > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> >
> > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> >
> > at java.security.AccessController.doPrivileged(Native Method)
> >
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> > at
> > org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1698)
> >
> > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
> >
> >
> > at
> > org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2148)
> >
> > at
> > org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2095)
> >
> > at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS.
> java:293)
> >
> > at java.security.AccessController.doPrivileged(Native Method)
> >
> > at javax.security.auth.Subject.doAs(Subject.java:360)
> >
> > at
> > org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1678)
> >
> > at
> > org.apache.nifi.processors.hadoop.PutHDFS.onTrigger(PutHDFS.java:223)
> >
> > at
> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> AbstractProcessor.java:27)
> >
> > at
> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> StandardProcessorNode.java:1118)
> >
> > at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:144)
> >
> > at
> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> ContinuallyRunProcessorTask.java:47)
> >
> > at
> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(
> TimerDrivenSchedulingAgent.java:132)
> >
> > at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >
> > at java.util.concurrent.FutureTask.runAndReset(
> FutureTask.java:308)
> >
> > at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >
> > at
> > java.util.concurrent.ScheduledThreadPoolExecutor$
> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >
> > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> >
> > at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> >
> > at java.lang.Thread.run(Thread.java:748)
> >
> > Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE
> > /nifi_out/unmatched/log20160930.csv for
> > DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because
> > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease
> holder.
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> recoverLeaseInternal(FSNamesystem.java:2882)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(
> FSNamesystem.java:2683)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFileInt(FSNamesystem.java:2982)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.
> appendFile(FSNamesystem.java:2950)
> >
> > at
> > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.
> append(NameNodeRpcServer.java:655)
> >
> > at
> > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi
> deTranslatorPB.append(ClientNamenodeProtocolServerSi
> deTranslatorPB.java:421)
> >
> > at
> > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$
> ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.
> java)
> >
> > at
> > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(
> ProtobufRpcEngine.java:616)
> >
> > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
> >
> > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
> >
> > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
> >
> > at java.security.AccessController.doPrivileged(Native Method)
> >
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> >
> > at
> > org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1698)
> >
> > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)
> >
> >
> > at org.apache.hadoop.ipc.Client.call(Client.java:1475)
> >
> > at org.apache.hadoop.ipc.Client.call(Client.java:1412)
> >
> > at
> > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.
> invoke(ProtobufRpcEngine.java:229)
> >
> > at com.sun.proxy.$Proxy188.append(Unknown Source)
> >
> > at
> > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat
> orPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
> >
> > at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown Source)
> >
> > at
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >
> > at java.lang.reflect.Method.invoke(Method.java:498)
> >
> > at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
> RetryInvocationHandler.java:191)
> >
> > at
> > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
> RetryInvocationHandler.java:102)
> >
> > at com.sun.proxy.$Proxy194.append(Unknown Source)
> >
> > at org.apache.hadoop.hdfs.DFSClient.callAppend(
> DFSClient.java:1808)
> >
> > at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
> >
> > at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
> >
> > at
> > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> doCall(DistributedFileSystem.java:340)
> >
> > at
> > org.apache.hadoop.hdfs.DistributedFileSystem$4.
> doCall(DistributedFileSystem.java:336)
> >
> > at
> > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(
> FileSystemLinkResolver.java:81)
> >
> > at
> > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> DistributedFileSystem.java:348)
> >
> > at
> > org.apache.hadoop.hdfs.DistributedFileSystem.append(
> DistributedFileSystem.java:318)
> >
> > at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1176)
> >
> > at
> > org.apache.nifi.processors.hadoop.PutHDFS$1$1.process(PutHDFS.java:301)
> >
> > at
> > org.apache.nifi.controller.repository.StandardProcessSession.read(
> StandardProcessSession.java:2125)
> >
> > ... 18 common frames omitted
> >
> > On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura <[email protected]>
> > wrote:
> >
> >> Hi Martin,
> >>
> >> Generally, NiFi processor doesn't load entire content of file and is
> >> capable of handling huge files.
> >> However, having massive amount of FlowFiles can cause OOM issue as
> >> FlowFiles and its Attributes resides on heap.
> >>
> >> I assume you are using 'Line Split Count' as 1 at SplitText.
> >> We recommend to use multiple SplitText processors to not generate many
> >> FlowFiles in a short period of time.
> >> For example, 1st SplitText splits files per 5,000 lines, then the 2nd
> >> SplitText splits into each line.
> >> This way, we can decrease number of FlowFiles at a given time
> >> requiring less heap.
> >>
> >> I hope this helps.
> >>
> >> Thanks,
> >> Koji
> >>
> >> On Wed, May 31, 2017 at 6:20 PM, Martin Eden <[email protected]>
> >> wrote:
> >> > Hi all,
> >> >
> >> > I have a vanilla Nifi 1.2.0 node with 1GB of heap.
> >> >
> >> > The flow I am trying to run is:
> >> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent -> MergeContent
> ->
> >> > PutHDFS
> >> >
> >> > When I give it a 300MB input zip file (2.5GB uncompressed) I am
> getting
> >> > Java OutOfMemoryError as below.
> >> >
> >> > Does NiFi read in the entire contents of files in memory? This is
> >> > unexpected. I thought it is chunking through files. Giving more ram is
> >> not
> >> > a solution as you can always get larger input files in the future.
> >> >
> >> > Does this mean NiFi is not suitable as a scalable ETL solution?
> >> >
> >> > Can someone please explain what is happening and how to mitigate large
> >> > files in NiFi? Any patterns?
> >> >
> >> > Thanks,
> >> > M
> >> >
> >> > ERROR [Timer-Driven Process Thread-9]
> >> > o.a.nifi.processors.standard.SplitText
> >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724]
> >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to process
> >> > session due to java.lang.OutOfMemoryError: Java heap space: {}
> >> >
> >> > java.lang.OutOfMemoryError: Java heap space
> >> >
> >> > at java.util.HashMap$EntrySet.iterator(HashMap.java:1013)
> >> >
> >> > at java.util.HashMap.putMapEntries(HashMap.java:511)
> >> >
> >> > at java.util.HashMap.<init>(HashMap.java:489)
> >> >
> >> > at
> >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> >> Builder.initializeAttributes(StandardFlowFileRecord.java:219)
> >> >
> >> > at
> >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$
> >> Builder.addAttributes(StandardFlowFileRecord.java:234)
> >> >
> >> > at
> >> > org.apache.nifi.controller.repository.StandardProcessSession.
> >> putAllAttributes(StandardProcessSession.java:1723)
> >> >
> >> > at
> >> > org.apache.nifi.processors.standard.SplitText.
> >> updateAttributes(SplitText.java:367)
> >> >
> >> > at
> >> > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles(
> >> SplitText.java:320)
> >> >
> >> > at
> >> > org.apache.nifi.processors.standard.SplitText.onTrigger(
> >> SplitText.java:258)
> >> >
> >> > at
> >> > org.apache.nifi.processor.AbstractProcessor.onTrigger(
> >> AbstractProcessor.java:27)
> >> >
> >> > at
> >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger(
> >> StandardProcessorNode.java:1118)
> >> >
> >> > at
> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> >> ContinuallyRunProcessorTask.java:144)
> >> >
> >> > at
> >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(
> >> ContinuallyRunProcessorTask.java:47)
> >> >
> >> > at
> >> > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.
> run(
> >> TimerDrivenSchedulingAgent.java:132)
> >> >
> >> > at
> >> > java.util.concurrent.Executors$RunnableAdapter.
> call(Executors.java:511)
> >> >
> >> > at java.util.concurrent.FutureTask.runAndReset(
> >> FutureTask.java:308)
> >> >
> >> > at
> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> >> >
> >> > at
> >> > java.util.concurrent.ScheduledThreadPoolExecutor$
> >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> >> >
> >> > at
> >> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> >> ThreadPoolExecutor.java:1142)
> >> >
> >> > at
> >> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> >> ThreadPoolExecutor.java:617)
> >> >
> >> > at java.lang.Thread.run(Thread.java:748)
> >>
>