[ 
https://issues.apache.org/jira/browse/FLUME-2403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14036012#comment-14036012
 ] 

Adam Gent commented on FLUME-2403:
----------------------------------

I updated the title. 

{quote}Curious how do you plan to measure the size of the incoming event in the 
source w/o actually making a copy of it into memory.{quote}

Just like how many other protocols work: I would hope that a header would be 
set with the payload size. For example this is how HTTP works. There is a 
header that describe the body size or if its streaming etc.. The headers would 
obviously always be loaded into memory but the body might not be.

I realize that is not how the Flume {{Event}} currently works so my only option 
in terms of size filtering in flume is to have a custom {{Source}} that reads 
the headers from the original source (AMQP or whatever) to determine if it 
should drop the message or not.

Given that our average message size is about 4K (and not 4MB) I am sort of 
curious what sort of performance we would get defer loading the body till 
absolutely needed and when needed pull the byte[] from the filesystem or 
configurable buffer (mmap). In other words no copying and lazy byte[] array 
since I can't change the Event contract. Perhaps a work around is even special 
headers to go tell the FileChannel where to pick up the body from a file 
created by the Source. This would be to avoid keeping the byte[] in memory for 
long and rely on the kernel fs cache.



> FileChannel causes OOME for large messages.
> -------------------------------------------
>
>                 Key: FLUME-2403
>                 URL: https://issues.apache.org/jira/browse/FLUME-2403
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: v1.5.0
>            Reporter: Adam Gent
>            Priority: Minor
>
> The spillable memory channel will fail rather badly on large messages.
> {code}
> Error while writing to required channel: FileChannel es1 { dataDirs: 
> [/var/lib/flume/data] }
> java.lang.OutOfMemoryError: Java heap space
>         at java.util.Arrays.copyOf(Arrays.java:2271)
>         at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
>         at 
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
>         at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
>         at 
> com.google.protobuf.CodedOutputStream.writeRawBytes(CodedOutputStream.java:984)
>         at 
> com.google.protobuf.CodedOutputStream.writeRawBytes(CodedOutputStream.java:905)
>         at 
> com.google.protobuf.CodedOutputStream.writeBytesNoTag(CodedOutputStream.java:386)
>         at 
> com.google.protobuf.CodedOutputStream.writeBytes(CodedOutputStream.java:229)
>         at 
> org.apache.flume.channel.file.proto.ProtosFactory$FlumeEvent.writeTo(ProtosFactory.java:6259)
>         at 
> com.google.protobuf.CodedOutputStream.writeMessageNoTag(CodedOutputStream.java:380)
>         at 
> com.google.protobuf.CodedOutputStream.writeMessage(CodedOutputStream.java:222)
>         at 
> org.apache.flume.channel.file.proto.ProtosFactory$Put.writeTo(ProtosFactory.java:4112)
>         at 
> com.google.protobuf.AbstractMessageLite.writeDelimitedTo(AbstractMessageLite.java:90)
>         at org.apache.flume.channel.file.Put.writeProtos(Put.java:93)
>         at 
> org.apache.flume.channel.file.TransactionEventRecord.toByteBuffer(TransactionEventRecord.java:174)
>         at org.apache.flume.channel.file.Log.put(Log.java:611)
>         at 
> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:458)
>         at 
> org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
>         at 
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.commitPutsToOverflow(SpillableMemoryChannel.java:490)
>         at 
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.putCommit(SpillableMemoryChannel.java:480)
>         at 
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doCommit(SpillableMemoryChannel.java:401)
>         at 
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>         at 
> org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
>         at 
> org.apache.flume.source.rabbitmq.RabbitMQSource.process(RabbitMQSource.java:162)
>         at 
> org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
>         at java.lang.Thread.run(Thread.java:744)
> {code}
> My config:
> {code}
> agent.channels.es1.type = SPILLABLEMEMORY
> agent.channels.es1.memoryCapacity = 10000
> agent.channels.es1.overflowCapacity = 1000000
> agent.channels.es1.byteCapacity = 800000
> agent.channels.es1.checkpointDir = /var/lib/flume/checkpoint
> agent.channels.es1.dataDirs = /var/lib/flume/data
> {code}
> I haven't looked at the code but I have some concerns like why a 
> ByteArrayOutputStream is being used instead of some other buffered stream 
> directly to the file system? Perhaps its because of the transactional nature 
> but I'm pretty sure you can write to the filesystem and rollback as Kafka and 
> modern databases do this with fsync.
> One could argue that I should just raise the max heap but this message is 
> coming from a RabbitMQ which had no issue holding on to the message (I 
> believe the message is like 500K).



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to