1 vcore, which is not even a full core (a shared and oversubscribed cpu core). I'm not sure what you expected to see when you raised concurrency to 10 :)
There's a lot of things NiFi is doing behind the scenes, especially around provenance recording. I don't recommend anything below 4 cores to have meaningful experience. If in a cloud, go to 8 cores per VM, unless you are designing for a low footprint with MiNiFi. Andrew On Fri, Jun 2, 2017, 6:30 AM Martin Eden <[email protected]> wrote: > Thanks Andrew, > > I have added UpdateAttribute processors to update the file names like you > said. Now it works, writing out 1MB files at a time (updated the > MergeContent MaxNumberOfEntries to 10000 to achieve that since each line in > my csv is 100 bytes). > > The current flow is: > ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> SplitText(1) > -> RouteOnContent -> MergeContent -> UpdateAttribute -> PutHDFS > > > -> MergeContent -> UpdateAttribute -> PutHDFS > > > -> MergeContent -> UpdateAttribute -> PutHDFS > > So now let's talk performance. > > With a 1 node NiFi running on a Google Compute Engine instance with 1 core > and 3.7 GB RAM and a 20GB disk, when I feed one 300MB zip file > (uncompressed 2.5GB csv text) to this flow it is basically never finishing > the job of transferring all the data. > > The inbound queue of RouteOnContent is always red and outbound queues are > mostly green so that indicates that this processor is the bottleneck. To > mitigate this I increased its number of concurrent tasks to 10 and then > observed tasks in progress 10, outbound queues temporarily red, avg task > latency increased from 2ms to 20ms, cpu on box maxed out to 100% by the > NiFi java process, load avg 5. > > I then decreased the number of concurrent tasks of RouteOnContent to 5 and > the task average time dropped to about half as expected, with cpu still > 100% taken by the NiFi java process. > > The RouteOnContent has 3 simple regexes that it applies. > > Questions: > > 1. Is it safe to say that I maxed out the performance of this flow on one > box with 1 core and 3.8 GB ram? > > 2. The performance seems a lot lower than expected though which is > worrying. Is this expected? I am planning to do this at much larger scale, > hundreds of GBs. > > 3. Is the RouteOnContent that I am using hitting NiFi hard? Is this not a > recommended use case? Is there anything obviously wrong in my flow? > Doing a bit of digging around in docs, presentations and other people's > experience it seems that NiFi's sweet spot is routing files based on > metadata (properties) and not really based on the actual contents of the > files. > > 4. Is Nifi suitable for large scale ETL. Copying and doing simple massaging > of data from File System A to File System B? From Database A to Database B? > This is what I am evaluating it for. > > I do see how running this on a box with more CPU and RAM, faster disks > (vertical scaling) would improve the performance and then adding another > node to the cluster. But I want to first validate the choice of > benchmarking flow and understand the performance on one box. > > Thanks a lot for all the people for helping me on this thread on my NiFi > evaluation journey. This is a really big plus for community support of > NiFi. > > M > > > > > > > On Thu, Jun 1, 2017 at 1:30 PM, Andrew Grande <[email protected]> wrote: > > > It looks like your max bin size is 1000 and 10MB. Every time you hit > those, > > it will write out a merged file. Update tge filename attribute to be > unique > > before writing via PutHDFS. > > > > Andrew > > > > On Thu, Jun 1, 2017, 2:24 AM Martin Eden <[email protected]> > wrote: > > > > > Hi Joe, > > > > > > Thanks for the explanations. Really useful in understanding how it > works. > > > Good to know that in the future this will be improved. > > > > > > About the appending to HDFS issue let me recap. My flow is: > > > ListHDFS -> FetchHDFS -> UnpackContent -> SplitText(5000) -> > SplitText(1) > > > -> RouteOnContent -> MergeContent -> PutHDFS -> hdfs://dir1/f.csv > > > > > > > > > -> MergeContent -> PutHDFS -> hdfs://dir2/f.csv > > > > > > > > > -> MergeContent -> PutHDFS -> hdfs://dir3/f.csv > > > > > > ListHDFS is monitoring an input folder where 300MB zip files are added > > > periodically. Each file uncompressed is 2.5 GB csv. > > > > > > So I am writing out to hdfs from multiple PutHDFS processors all of > them > > > having conflict resolution set to *APPEND* and different output > folders. > > > > > > The name of the file will be however the same *f.csv*. It gets picked > up > > > from the name of the flow files which bear the name of the original > > > uncompressed file. This happens I think in the MergeContent processor. > > > > > > Since all of these processors are running with 1 concurrent task, it > > seems > > > that we cannot append concurrently to hdfs even if we are appending to > > > different files in different folders for some reason. Any ideas how to > > > mitigate this? > > > > > > It seems other people have encountered this > > > < > > > https://community.hortonworks.com/questions/61096/puthdfs- > > leaseexpiredexception-error-when-running-m.html > > > > > > > with NiFi but there is no conclusive solution. It does seem also that > > > appending to hdfs is somewhat problematic > > > < > > > http://community.cloudera.com/t5/Storage-Random-Access-HDFS/ > > How-to-append-files-to-HDFS-with-Java-quot-current-leaseholder/td-p/41369 > > > > > > > . > > > > > > So stepping back, the reason I am doing append in the PutHDFS is > because > > I > > > did not manage to find a setting in the MergeContent processors that > > > basically allows creation of multiple bundled flow files with the same > > root > > > name but different sequence numbers or timestamps (like f.csv.1, > f.csv.2 > > > ....). They all get the same name which is f.csv. Is that possible > > somehow? > > > See my detailed MergeContent processor config below. > > > > > > So basically I have a 2.5GB csv file that eventually gets broken up in > > > lines and the lines gets merged together in bundles of 10 MB but when > > those > > > bundles are emitted to the PutHDFS they have the same name as the > > original > > > file over and over again. I would like them to have a different name > > based > > > on a timestamp or sequence number let's say so that I can avoid the > > append > > > conflict resolution in PutHDFS which is causing me grief right now. Is > > that > > > possible? > > > > > > Thanks, > > > M > > > > > > > > > Currently my MergeContent processor config is: > > > <properties> > > > * <entry> <key>Merge Strategy</key> <value>Bin-Packing > > Algorithm</value> > > > </entry>* > > > * <entry> <key>Merge Format</key> <value>Binary Concatenation</value> > > > </entry>* > > > <entry> <key>Attribute Strategy</key><value>Keep Only Common > > > Attributes</value> </entry> > > > <entry> <key>Correlation Attribute Name</key> </entry> > > > <entry> <key>Minimum Number of Entries</key><value>1</value> > </entry> > > > <entry> <key>Maximum Number of Entries</key> <value>1000</value> > > > </entry> > > > <entry> <key>Minimum Group Size</key> <value>0 B</value> </entry> > > > * <entry> <key>Maximum Group Size</key> <value>10 MB</value> > </entry>* > > > <entry> <key>Max Bin Age</key> </entry> > > > <entry> <key>Maximum number of Bins</key> <value>5</value> </entry> > > > <entry> <key>Delimiter Strategy</key><value>Text</value> </entry> > > > <entry> <key>Header File</key> </entry> > > > <entry> <key>Footer File</key> </entry> > > > <entry> <key>Demarcator File</key> <value></value> </entry> > > > <entry> <key>Compression Level</key> <value>1</value></entry> > > > <entry> <key>Keep Path</key> <value>false</value> </entry> > > > </properties> > > > > > > > > > On Wed, May 31, 2017 at 3:52 PM, Joe Witt <[email protected]> wrote: > > > > > > > Split failed before even with backpressure: > > > > - yes that backpressure kicks in when destination queues for a given > > > > processor have reached their target size (in count of flowfiles or > > > > total size represented). However, to clarify why the OOM happened it > > > > is important to realize that it is not about 'flow files over a quick > > > > period of time' but rather 'flow files held within a single process > > > > session. Your SplitText was pulling a single flowfile but then > > > > creating lets say 1,000,000 resulting flow files and then committing > > > > that change. That happens within a session. But all those flow file > > > > objects (not their content) are held in memory and at such high > > > > numbers it creates excessive heap usage. The two phase > divide/conquer > > > > approach Koji suggested solves that and eventually we need to solve > > > > that by swapping out the flowfiles to disk within a session. We > > > > actually do swap out flowfiles sitting on queues after a certain > > > > threshold is reached for this very reason. This means you should be > > > > able to have many millions of flowfiles sitting around in the flow > for > > > > whatever reason and not hit memory problems. > > > > > > > > Hope that helps there. > > > > > > > > On PutHDFS it looks like possibly two things are trying to append to > > > > the same file? If yes I'd really recommend not appending but rather > > > > use MergeContent to create data bundles of a given size then write > > > > those to HDFS. > > > > > > > > Thanks > > > > Joe > > > > > > > > On Wed, May 31, 2017 at 10:33 AM, Martin Eden < > [email protected] > > > > > > > wrote: > > > > > Hi Koji, > > > > > > > > > > Good to know that it can handle large files. I thought it was the > > case > > > > but > > > > > I was just not seeing in practice. > > > > > > > > > > Yes I am using 'Line Split Count' as 1 at SplitText. > > > > > > > > > > I added the extra SplitText processor exactly as you suggested and > > the > > > > OOM > > > > > went away. So, big thanks!!! > > > > > > > > > > However I have 2 follow-up questions: > > > > > > > > > > 1. Before adding the extra SplitText processor I also played with > the > > > > > back-pressure settings on the outbound queue of the original > > SplitText > > > > > processor, since you mentioned that it is generating files at a > rate > > > that > > > > > is too high, I figure the queue should slow it down. I tried a > limit > > of > > > > > 100MB or 1000 files and I still got the OOMs in the SplitText > > > processor. > > > > > Why isn't the queue back-pressure helping me in this case? Where > > would > > > > that > > > > > come in handy then? Why id the extra SplitText processor needed to > > fix > > > > > things and not just the queue back-pressure? > > > > > > > > > > 2. I am now close to completing my flow but I am hitting another > > error. > > > > > This time it's the last stage, the PutHDFS throws > > > > > o.apache.nifi.processors.hadoop.PutHDFS > > > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] Failed to write to > > > HDFS > > > > > due to org.apache.nifi.processor.exception.ProcessException: > > > IOException > > > > > thrown from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec > > > > > See the full stacktrace below. > > > > > I have a parallelism of 1 for my PutHDFS processors. Any ideas why > > this > > > > is > > > > > happening? > > > > > > > > > > Thanks, > > > > > Martin > > > > > > > > > > 2017-05-31 13:50:29,341 ERROR [Timer-Driven Process Thread-5] > > > > > o.apache.nifi.processors.hadoop.PutHDFS > > > > > PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa] F > > > > > > > > > > ailed to write to HDFS due to > > > > > org.apache.nifi.processor.exception.ProcessException: IOException > > > thrown > > > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec > > > > > > > > > > 5aa]: > > > > > org.apache.hadoop.ipc.RemoteException(org.apache. > > hadoop.hdfs.protocol. > > > > AlreadyBeingCreatedException): > > > > > Failed to APPEND_FILE /nifi_out/unmatched/log > > > > > > > > > > 20160930.csv for DFSClient_NONMAPREDUCE_-1411681085_97 on > 10.128.0.7 > > > > > because DFSClient_NONMAPREDUCE_-1411681085_97 is already the > current > > > > lease > > > > > holder. > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > > > recoverLeaseInternal(FSNamesystem.java:2882) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > appendFileInternal( > > > > FSNamesystem.java:2683) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > > > appendFileInt(FSNamesystem.java:2982) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > > > appendFile(FSNamesystem.java:2950) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer. > > > > append(NameNodeRpcServer.java:655) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi > > > > deTranslatorPB.append(ClientNamenodeProtocolServerSi > > > > deTranslatorPB.java:421) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ > > > > > > > ClientNamenodeProtocol$2.callBlockingMethod( > > ClientNamenodeProtocolProtos.j > > > > > > > > > > ava) > > > > > > > > > > at > > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ > > ProtoBufRpcInvoker.call( > > > > ProtobufRpcEngine.java:616) > > > > > > > > > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > > > > > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java: > > 2049) > > > > > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java: > > 2045) > > > > > > > > > > at java.security.AccessController.doPrivileged(Native > > Method) > > > > > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > > > > > at > > > > > org.apache.hadoop.security.UserGroupInformation.doAs( > > > > UserGroupInformation.java:1698) > > > > > > > > > > at org.apache.hadoop.ipc.Server$ > > Handler.run(Server.java:2043) > > > > > > > > > > : {} > > > > > > > > > > org.apache.nifi.processor.exception.ProcessException: IOException > > > thrown > > > > > from PutHDFS[id=1816e90b-ebf2-365f-c882-09f1faaec5aa]: > > > > > org.apache.hadoop.ipc.Re > > > > > > > > > > moteException(org.apache.hadoop.hdfs.protocol. > > > > AlreadyBeingCreatedException): > > > > > Failed to APPEND_FILE /nifi_out/unmatched/log20160930.csv for > > > > DFSClient_NON > > > > > > > > > > MAPREDUCE_-1411681085_97 on 10.128.0.7 because > > > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease > > > > holder. > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > > > recoverLeaseInternal(FSNamesystem.java:2882) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > appendFileInternal( > > > > FSNamesystem.java:2683) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > > > appendFileInt(FSNamesystem.java:2982) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > > > appendFile(FSNamesystem.java:2950) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer. > > > > append(NameNodeRpcServer.java:655) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi > > > > deTranslatorPB.append(ClientNamenodeProtocolServerSi > > > > deTranslatorPB.java:421) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ > > > > > > > ClientNamenodeProtocol$2.callBlockingMethod( > > ClientNamenodeProtocolProtos.j > > > > > > > > > > ava) > > > > > > > > > > at > > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ > > ProtoBufRpcInvoker.call( > > > > ProtobufRpcEngine.java:616) > > > > > > > > > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > > > > > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java: > > 2049) > > > > > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java: > > 2045) > > > > > > > > > > at java.security.AccessController.doPrivileged(Native > > Method) > > > > > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > > > > > at > > > > > org.apache.hadoop.security.UserGroupInformation.doAs( > > > > UserGroupInformation.java:1698) > > > > > > > > > > at org.apache.hadoop.ipc.Server$ > > Handler.run(Server.java:2043) > > > > > > > > > > > > > > > at > > > > > org.apache.nifi.controller.repository.StandardProcessSession.read( > > > > StandardProcessSession.java:2148) > > > > > > > > > > at > > > > > org.apache.nifi.controller.repository.StandardProcessSession.read( > > > > StandardProcessSession.java:2095) > > > > > > > > > > at org.apache.nifi.processors.hadoop.PutHDFS$1.run(PutHDFS. > > > > java:293) > > > > > > > > > > at java.security.AccessController.doPrivileged(Native > > Method) > > > > > > > > > > at javax.security.auth.Subject.doAs(Subject.java:360) > > > > > > > > > > at > > > > > org.apache.hadoop.security.UserGroupInformation.doAs( > > > > UserGroupInformation.java:1678) > > > > > > > > > > at > > > > > org.apache.nifi.processors.hadoop.PutHDFS.onTrigger( > > PutHDFS.java:223) > > > > > > > > > > at > > > > > org.apache.nifi.processor.AbstractProcessor.onTrigger( > > > > AbstractProcessor.java:27) > > > > > > > > > > at > > > > > org.apache.nifi.controller.StandardProcessorNode.onTrigger( > > > > StandardProcessorNode.java:1118) > > > > > > > > > > at > > > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > > > > ContinuallyRunProcessorTask.java:144) > > > > > > > > > > at > > > > > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call( > > > > ContinuallyRunProcessorTask.java:47) > > > > > > > > > > at > > > > > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1. > > run( > > > > TimerDrivenSchedulingAgent.java:132) > > > > > > > > > > at > > > > > java.util.concurrent.Executors$RunnableAdapter. > > call(Executors.java:511) > > > > > > > > > > at java.util.concurrent.FutureTask.runAndReset( > > > > FutureTask.java:308) > > > > > > > > > > at > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ > > > > ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > > > > > > > > > at > > > > > java.util.concurrent.ScheduledThreadPoolExecutor$ > > > > ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > > > > > > > > > at > > > > > java.util.concurrent.ThreadPoolExecutor.runWorker( > > > > ThreadPoolExecutor.java:1142) > > > > > > > > > > at > > > > > java.util.concurrent.ThreadPoolExecutor$Worker.run( > > > > ThreadPoolExecutor.java:617) > > > > > > > > > > at java.lang.Thread.run(Thread.java:748) > > > > > > > > > > Caused by: org.apache.hadoop.ipc.RemoteException: Failed to > > APPEND_FILE > > > > > /nifi_out/unmatched/log20160930.csv for > > > > > DFSClient_NONMAPREDUCE_-1411681085_97 on 10.128.0.7 because > > > > > DFSClient_NONMAPREDUCE_-1411681085_97 is already the current lease > > > > holder. > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > > > recoverLeaseInternal(FSNamesystem.java:2882) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > appendFileInternal( > > > > FSNamesystem.java:2683) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > > > appendFileInt(FSNamesystem.java:2982) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.FSNamesystem. > > > > appendFile(FSNamesystem.java:2950) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer. > > > > append(NameNodeRpcServer.java:655) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSi > > > > deTranslatorPB.append(ClientNamenodeProtocolServerSi > > > > deTranslatorPB.java:421) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ > > > > ClientNamenodeProtocol$2.callBlockingMethod( > > ClientNamenodeProtocolProtos. > > > > java) > > > > > > > > > > at > > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ > > ProtoBufRpcInvoker.call( > > > > ProtobufRpcEngine.java:616) > > > > > > > > > > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) > > > > > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java: > > 2049) > > > > > > > > > > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java: > > 2045) > > > > > > > > > > at java.security.AccessController.doPrivileged(Native > > Method) > > > > > > > > > > at javax.security.auth.Subject.doAs(Subject.java:422) > > > > > > > > > > at > > > > > org.apache.hadoop.security.UserGroupInformation.doAs( > > > > UserGroupInformation.java:1698) > > > > > > > > > > at org.apache.hadoop.ipc.Server$ > > Handler.run(Server.java:2043) > > > > > > > > > > > > > > > at org.apache.hadoop.ipc.Client.call(Client.java:1475) > > > > > > > > > > at org.apache.hadoop.ipc.Client.call(Client.java:1412) > > > > > > > > > > at > > > > > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker. > > > > invoke(ProtobufRpcEngine.java:229) > > > > > > > > > > at com.sun.proxy.$Proxy188.append(Unknown Source) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslat > > > > orPB.append(ClientNamenodeProtocolTranslatorPB.java:328) > > > > > > > > > > at sun.reflect.GeneratedMethodAccessor314.invoke(Unknown > > > Source) > > > > > > > > > > at > > > > > sun.reflect.DelegatingMethodAccessorImpl.invoke( > > > > DelegatingMethodAccessorImpl.java:43) > > > > > > > > > > at java.lang.reflect.Method.invoke(Method.java:498) > > > > > > > > > > at > > > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod( > > > > RetryInvocationHandler.java:191) > > > > > > > > > > at > > > > > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke( > > > > RetryInvocationHandler.java:102) > > > > > > > > > > at com.sun.proxy.$Proxy194.append(Unknown Source) > > > > > > > > > > at org.apache.hadoop.hdfs.DFSClient.callAppend( > > > > DFSClient.java:1808) > > > > > > > > > > at org.apache.hadoop.hdfs.DFSClient.append(DFSClient. > > java:1877) > > > > > > > > > > at org.apache.hadoop.hdfs.DFSClient.append(DFSClient. > > java:1847) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.DistributedFileSystem$4. > > > > doCall(DistributedFileSystem.java:340) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.DistributedFileSystem$4. > > > > doCall(DistributedFileSystem.java:336) > > > > > > > > > > at > > > > > org.apache.hadoop.fs.FileSystemLinkResolver.resolve( > > > > FileSystemLinkResolver.java:81) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.DistributedFileSystem.append( > > > > DistributedFileSystem.java:348) > > > > > > > > > > at > > > > > org.apache.hadoop.hdfs.DistributedFileSystem.append( > > > > DistributedFileSystem.java:318) > > > > > > > > > > at org.apache.hadoop.fs.FileSystem.append(FileSystem. > > java:1176) > > > > > > > > > > at > > > > > org.apache.nifi.processors.hadoop.PutHDFS$1$1.process( > > PutHDFS.java:301) > > > > > > > > > > at > > > > > org.apache.nifi.controller.repository.StandardProcessSession.read( > > > > StandardProcessSession.java:2125) > > > > > > > > > > ... 18 common frames omitted > > > > > > > > > > On Wed, May 31, 2017 at 10:29 AM, Koji Kawamura < > > > [email protected]> > > > > > wrote: > > > > > > > > > >> Hi Martin, > > > > >> > > > > >> Generally, NiFi processor doesn't load entire content of file and > is > > > > >> capable of handling huge files. > > > > >> However, having massive amount of FlowFiles can cause OOM issue as > > > > >> FlowFiles and its Attributes resides on heap. > > > > >> > > > > >> I assume you are using 'Line Split Count' as 1 at SplitText. > > > > >> We recommend to use multiple SplitText processors to not generate > > many > > > > >> FlowFiles in a short period of time. > > > > >> For example, 1st SplitText splits files per 5,000 lines, then the > > 2nd > > > > >> SplitText splits into each line. > > > > >> This way, we can decrease number of FlowFiles at a given time > > > > >> requiring less heap. > > > > >> > > > > >> I hope this helps. > > > > >> > > > > >> Thanks, > > > > >> Koji > > > > >> > > > > >> On Wed, May 31, 2017 at 6:20 PM, Martin Eden < > > [email protected] > > > > > > > > >> wrote: > > > > >> > Hi all, > > > > >> > > > > > >> > I have a vanilla Nifi 1.2.0 node with 1GB of heap. > > > > >> > > > > > >> > The flow I am trying to run is: > > > > >> > ListHDFS -> FetchHDFS -> SplitText -> RouteOnContent -> > > MergeContent > > > > -> > > > > >> > PutHDFS > > > > >> > > > > > >> > When I give it a 300MB input zip file (2.5GB uncompressed) I am > > > > getting > > > > >> > Java OutOfMemoryError as below. > > > > >> > > > > > >> > Does NiFi read in the entire contents of files in memory? This > is > > > > >> > unexpected. I thought it is chunking through files. Giving more > > ram > > > is > > > > >> not > > > > >> > a solution as you can always get larger input files in the > future. > > > > >> > > > > > >> > Does this mean NiFi is not suitable as a scalable ETL solution? > > > > >> > > > > > >> > Can someone please explain what is happening and how to mitigate > > > large > > > > >> > files in NiFi? Any patterns? > > > > >> > > > > > >> > Thanks, > > > > >> > M > > > > >> > > > > > >> > ERROR [Timer-Driven Process Thread-9] > > > > >> > o.a.nifi.processors.standard.SplitText > > > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] > > > > >> > SplitText[id=e16939ca-f28f-1178-b66e-054e43a0a724] failed to > > process > > > > >> > session due to java.lang.OutOfMemoryError: Java heap space: {} > > > > >> > > > > > >> > java.lang.OutOfMemoryError: Java heap space > > > > >> > > > > > >> > at > java.util.HashMap$EntrySet.iterator(HashMap.java:1013) > > > > >> > > > > > >> > at java.util.HashMap.putMapEntries(HashMap.java:511) > > > > >> > > > > > >> > at java.util.HashMap.<init>(HashMap.java:489) > > > > >> > > > > > >> > at > > > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$ > > > > >> Builder.initializeAttributes(StandardFlowFileRecord.java:219) > > > > >> > > > > > >> > at > > > > >> > org.apache.nifi.controller.repository.StandardFlowFileRecord$ > > > > >> Builder.addAttributes(StandardFlowFileRecord.java:234) > > > > >> > > > > > >> > at > > > > >> > org.apache.nifi.controller.repository.StandardProcessSession. > > > > >> putAllAttributes(StandardProcessSession.java:1723) > > > > >> > > > > > >> > at > > > > >> > org.apache.nifi.processors.standard.SplitText. > > > > >> updateAttributes(SplitText.java:367) > > > > >> > > > > > >> > at > > > > >> > > > > org.apache.nifi.processors.standard.SplitText.generateSplitFlowFiles( > > > > >> SplitText.java:320) > > > > >> > > > > > >> > at > > > > >> > org.apache.nifi.processors.standard.SplitText.onTrigger( > > > > >> SplitText.java:258) > > > > >> > > > > > >> > at > > > > >> > org.apache.nifi.processor.AbstractProcessor.onTrigger( > > > > >> AbstractProcessor.java:27) > > > > >> > > > > > >> > at > > > > >> > org.apache.nifi.controller.StandardProcessorNode.onTrigger( > > > > >> StandardProcessorNode.java:1118) > > > > >> > > > > > >> > at > > > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. > > call( > > > > >> ContinuallyRunProcessorTask.java:144) > > > > >> > > > > > >> > at > > > > >> > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask. > > call( > > > > >> ContinuallyRunProcessorTask.java:47) > > > > >> > > > > > >> > at > > > > >> > org.apache.nifi.controller.scheduling. > > TimerDrivenSchedulingAgent$1. > > > > run( > > > > >> TimerDrivenSchedulingAgent.java:132) > > > > >> > > > > > >> > at > > > > >> > java.util.concurrent.Executors$RunnableAdapter. > > > > call(Executors.java:511) > > > > >> > > > > > >> > at java.util.concurrent.FutureTask.runAndReset( > > > > >> FutureTask.java:308) > > > > >> > > > > > >> > at > > > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$ > > > > >> ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor. > > java:180) > > > > >> > > > > > >> > at > > > > >> > java.util.concurrent.ScheduledThreadPoolExecutor$ > > > > >> ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > > > >> > > > > > >> > at > > > > >> > java.util.concurrent.ThreadPoolExecutor.runWorker( > > > > >> ThreadPoolExecutor.java:1142) > > > > >> > > > > > >> > at > > > > >> > java.util.concurrent.ThreadPoolExecutor$Worker.run( > > > > >> ThreadPoolExecutor.java:617) > > > > >> > > > > > >> > at java.lang.Thread.run(Thread.java:748) > > > > >> > > > > > > > > > >
