[
https://issues.apache.org/jira/browse/FLUME-2403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14033935#comment-14033935
]
Adam Gent edited comment on FLUME-2403 at 6/17/14 3:48 PM:
-----------------------------------------------------------
While I can certainly block the data with the interceptor its almost too late
by then as its byte[]. I still think the memory or object size detection is off
given another bug just filed after mine (no association with the person who
filed that) https://issues.apache.org/jira/browse/FLUME-2404
Is there anywhere to intercept in a streaming fashion or perhaps the event
headers could be read first to determine if the message is too big. While not
handling large messages is not a flume responsibility I would hope reliability
is.
Given that today's filesystems are now often solid state drives and kernel fs
caching architecture I think a streaming (ie InputStream with payload size API)
in the future would be beneficial. We have done benchmarks on our side and it
can be shockingly more efficient to copy the data raw to the filesystem then
copying it a bunch of times in memory even with smaller objects given (There is
a great article on that here:
https://kafka.apache.org/documentation.html#persistence).
For now I am generally happy with flume and I'll join the mailinglist to
perhaps discuss more (as the above is slightly off topic).
was (Author: agentgt):
While I can certainly block the data with the interceptor its almost too late
by then as its byte[]. I still think the memory or object size detection is off
given another bug just filed after mine (no association with the person who
filed that) https://issues.apache.org/jira/browse/FLUME-2404
Is there anywhere to intercept in a streaming fashion or perhaps the event
headers could be read first to determine if the message is too big. While not
handling large messages is not a flume responsibility I would hope reliability
is.
Given that today's filesystems are now often solid state drives and kernel fs
caching architecture I think a streaming (ie InputStream with payload size API)
in the future would be beneficial. We have done benchmarks on our side and it
can be shockingly more efficient to copy the data raw to the filesystem then
copying it a bunch of times in memory even with smaller objects given (There is
a great article on that here:
http://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying).
For now I am generally happy with flume and I'll join the mailinglist to
perhaps discuss more (as the above is slightly off topic).
> Spillable Memory Channel causes OOME for large messages.
> --------------------------------------------------------
>
> Key: FLUME-2403
> URL: https://issues.apache.org/jira/browse/FLUME-2403
> Project: Flume
> Issue Type: Bug
> Components: Channel
> Affects Versions: v1.5.0
> Reporter: Adam Gent
> Priority: Critical
>
> The spillable memory channel will fail rather badly on large messages.
> {code}
> Error while writing to required channel: FileChannel es1 { dataDirs:
> [/var/lib/flume/data] }
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:2271)
> at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
> at
> java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
> at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
> at
> com.google.protobuf.CodedOutputStream.writeRawBytes(CodedOutputStream.java:984)
> at
> com.google.protobuf.CodedOutputStream.writeRawBytes(CodedOutputStream.java:905)
> at
> com.google.protobuf.CodedOutputStream.writeBytesNoTag(CodedOutputStream.java:386)
> at
> com.google.protobuf.CodedOutputStream.writeBytes(CodedOutputStream.java:229)
> at
> org.apache.flume.channel.file.proto.ProtosFactory$FlumeEvent.writeTo(ProtosFactory.java:6259)
> at
> com.google.protobuf.CodedOutputStream.writeMessageNoTag(CodedOutputStream.java:380)
> at
> com.google.protobuf.CodedOutputStream.writeMessage(CodedOutputStream.java:222)
> at
> org.apache.flume.channel.file.proto.ProtosFactory$Put.writeTo(ProtosFactory.java:4112)
> at
> com.google.protobuf.AbstractMessageLite.writeDelimitedTo(AbstractMessageLite.java:90)
> at org.apache.flume.channel.file.Put.writeProtos(Put.java:93)
> at
> org.apache.flume.channel.file.TransactionEventRecord.toByteBuffer(TransactionEventRecord.java:174)
> at org.apache.flume.channel.file.Log.put(Log.java:611)
> at
> org.apache.flume.channel.file.FileChannel$FileBackedTransaction.doPut(FileChannel.java:458)
> at
> org.apache.flume.channel.BasicTransactionSemantics.put(BasicTransactionSemantics.java:93)
> at
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.commitPutsToOverflow(SpillableMemoryChannel.java:490)
> at
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.putCommit(SpillableMemoryChannel.java:480)
> at
> org.apache.flume.channel.SpillableMemoryChannel$SpillableMemoryTransaction.doCommit(SpillableMemoryChannel.java:401)
> at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> at
> org.apache.flume.channel.ChannelProcessor.processEvent(ChannelProcessor.java:267)
> at
> org.apache.flume.source.rabbitmq.RabbitMQSource.process(RabbitMQSource.java:162)
> at
> org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:139)
> at java.lang.Thread.run(Thread.java:744)
> {code}
> My config:
> {code}
> agent.channels.es1.type = SPILLABLEMEMORY
> agent.channels.es1.memoryCapacity = 10000
> agent.channels.es1.overflowCapacity = 1000000
> agent.channels.es1.byteCapacity = 800000
> agent.channels.es1.checkpointDir = /var/lib/flume/checkpoint
> agent.channels.es1.dataDirs = /var/lib/flume/data
> {code}
> I haven't looked at the code but I have some concerns like why a
> ByteArrayOutputStream is being used instead of some other buffered stream
> directly to the file system? Perhaps its because of the transactional nature
> but I'm pretty sure you can write to the filesystem and rollback as Kafka and
> modern databases do this with fsync.
> One could argue that I should just raise the max heap but this message is
> coming from a RabbitMQ which had no issue holding on to the message (I
> believe the message is like 500K).
--
This message was sent by Atlassian JIRA
(v6.2#6252)