[
https://issues.apache.org/jira/browse/FLUME-2403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14032432#comment-14032432
]
Adam Gent commented on FLUME-2403:
----------------------------------
It turns out the message was 4MB (I got confused which message it was before).
While that is a big message my hope would still be that the size of the message
would be detected and at most one whole message would have to be in memory
(although ideally in my opinion that should not be the case).
I looked at the code and it appears that the {{Event}} is copied like 2-3 times
to memory for the File Channel. The first being to create the Event from the
source and then again by the {{TransactionEventRecord.toByteBuffer}}. I think
the buffer size detection needs to be aware the Event is being copied that many
times.
It also seems a fundamental flaw with Flume given the interfaces using
{{byte[]}} for the body is that an Event has to be stored in memory at least
once
(https://flume.apache.org/releases/content/1.5.0/apidocs/org/apache/flume/Event.html).
This would be like requiring multipart forms to be in memory for servlet
containers.
We could perhaps develop a policy on our end to make sure gigantic messages
don't get through (I'll have to look at the interceptors) but it does seem
wasteful and not very robust to handle large but not that large messages.
> Spillable Memory Channel causes OOME for large messages.
> --------------------------------------------------------
>
> Key: FLUME-2403
> URL: https://issues.apache.org/jira/browse/FLUME-2403
> Project: Flume
> Issue Type: Bug
> Components: Channel
> Affects Versions: v1.5.0
> Reporter: Adam Gent
> Priority: Critical
>
> The spillable memory channel will fail rather badly on large messages.
> {code}
> Error while writing to required channel: FileChannel es1 { dataDirs:
> [/var/lib/flume/data] }
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:2271)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at
> com.google.protobuf.CodedOutputStream.writeRawBytes(CodedOutputStream.java:984)
> at
> com.google.protobuf.CodedOutputStream.writeRawBytes(CodedOutputStream.java:905)
> at
> com.google.protobuf.CodedOutputStream.writeBytesNoTag(CodedOutputStream.java:386)
> at
> com.google.protobuf.CodedOutputStream.writeBytes(CodedOutputStream.java:229)
> at
> org.apache.flume.channel.file.proto.ProtosFactory$FlumeEvent.writeTo(ProtosFactory.java:6259)
> at
> com.google.protobuf.CodedOutputStream.writeMessageNoTag(CodedOutputStream.java:380)
> at
> com.google.protobuf.CodedOutputStream.writeMessage(CodedOutputStream.java:222)
> at
> org.apache.flume.channel.file.proto.ProtosFactory$Put.writeTo(ProtosFactory.java:4112)
> at
> com.google.protobuf.AbstractMessageLite.writeDelimitedTo(AbstractMessageLite.java:90)
> at org.apache.flume.channel.file.Put.writeProtos(Put.java:93)
> at
> org.apache.flume.channel.file.TransactionEventRecord.toByteBuffer(TransactionEventRecord.java:174)
> at org.apache.flume.channel.file.Log.put(Log.java:611)
> at
> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:458)
> at
> org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
> at
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.commitPutsToOverflow(SpillableMemoryChannel.java:490)
> at
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.putCommit(SpillableMemoryChannel.java:480)
> at
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doCommit(SpillableMemoryChannel.java:401)
> at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> at
> org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
> at
> org.apache.flume.source.rabbitmq.RabbitMQSource.process(RabbitMQSource.java:162)
> at
> org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
> at java.lang.Thread.run(Thread.java:744)
> {code}
> My config:
> {code}
> agent.channels.es1.type = SPILLABLEMEMORY
> agent.channels.es1.memoryCapacity = 10000
> agent.channels.es1.overflowCapacity = 1000000
> agent.channels.es1.byteCapacity = 800000
> agent.channels.es1.checkpointDir = /var/lib/flume/checkpoint
> agent.channels.es1.dataDirs = /var/lib/flume/data
> {code}
> I haven't looked at the code but I have some concerns like why a
> ByteArrayOutputStream is being used instead of some other buffered stream
> directly to the file system? Perhaps its because of the transactional nature
> but I'm pretty sure you can write to the filesystem and rollback as Kafka and
> modern databases do this with fsync.
> One could argue that I should just raise the max heap but this message is
> coming from a RabbitMQ which had no issue holding on to the message (I
> believe the message is like 500K).
--
This message was sent by Atlassian JIRA
(v6.2#6252)