[
https://issues.apache.org/jira/browse/FLUME-2739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617964#comment-14617964
]
yinghua_zh commented on FLUME-2739:
-----------------------------------
The config information as follow:
client.sources = spoolsource
client.channels = mem_channel
client.sinks = test_sink
client.sources.spoolsource.type = spooldir
client.sources.spoolsource.spoolDir = /srv/BigData/journalnode/test_data
client.sources.spoolsource.fileSuffix = .COMPLETED
client.sources.spoolsource.ignorePattern = ^$
client.sources.spoolsource.trackerDir = /srv/BigData/hadoop/spoolsource/tracker1
client.sources.spoolsource.maxBlobLength = 16384
client.sources.spoolsource.batchSize = 1024
client.sources.spoolsource.inputCharset = UTF-8
client.sources.spoolsource.deserializer = LINE
client.sources.spoolsource.selector.type = replicating
client.sources.spoolsource.fileHeaderKey = file
client.sources.spoolsource.fileHeader = false
client.sources.spoolsource.basenameHeader = true
client.sources.spoolsource.basenameHeaderKey = basename
client.sources.spoolsource.deletePolicy = never
client.sources.spoolsource.deserializer.maxLineLength = 200000
client.sources.spoolsource.deserializer.maxBatchLine = 5
client.channels.mem_channel.type = memory
client.channels.mem_channel.capacity = 10240
client.channels.mem_channel.keep-alive = 3
client.channels.mem_channel.transactionCapacity = 10240
client.channels.mem_channel.byteCapacity = 1212428800
client.channels.file_2_channel.type = memory
client.channels.file_2_channel.capacity = 10240
client.channels.file_2_channel.keep-alive = 3
client.channels.file_2_channel.transactionCapacity = 10240
client.channels.file_2_channel.byteCapacity = 1212428800
client.sinks.test_sink.type = null
#client.sinks.test_sink.hostname = 51.196.115.3
#client.sinks.test_sink.port = 31001
client.sinks.test_sink.batch-size = 10240
#client.sinks.test_sink.connect-timeout = 60000
#client.sinks.test_sink.request-timeout = 60000
client.sources.spoolsource.channels = mem_channel
client.sinks.test_sink.channel = mem_channel
If the source collent one 3G file ,the problem will be occurred.
> Memory channel has not release permits from the bytesRemaining semaphore
> -------------------------------------------------------------------------
>
> Key: FLUME-2739
> URL: https://issues.apache.org/jira/browse/FLUME-2739
> Project: Flume
> Issue Type: Bug
> Components: Channel
> Affects Versions: v1.5.0
> Reporter: yinghua_zh
>
> The memory channel apply the permit when the source put and commit the event
> into the memory channel,but it does not release the permits from the
> bytesRemaining semaphore when the source commits successfully,It will not
> accepet the event for a while.it will occurs the following exceptions:
> 015-07-08 07:52:06,089 | WARN | [pool-4-thread-1] | The channel is full,
> and cannot write data now. The source will try again after 500 milliseconds
> |
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:276)
> org.apache.flume.ChannelException: Unable to put batch on required channel:
> org.apache.flume.channel.MemoryChannel{name: mem_channel}
> at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
> at
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:273)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.ChannelException: Cannot commit transaction. Byte
> capacity allocated to store event body 9.69943E8reached. Please increase heap
> space/byte capacity allocated to the channel as the sinks may not be keeping
> up with the sources
> at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:131)
> at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> I modify the doCommit method ,and like this ,it does not occur exception:
> @Override
> protected void doCommit() throws InterruptedException {
> int remainingChange = takeList.size() - putList.size();
> if(remainingChange < 0) {
> if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
> TimeUnit.SECONDS)) {
> throw new ChannelException("Cannot commit transaction. Byte
> capacity " +
> "allocated to store event body " + byteCapacity *
> byteCapacitySlotSize +
> "reached. Please increase heap space/byte capacity allocated to "
> +
> "the channel as the sinks may not be keeping up with the
> sources");
> }
> if(!queueRemaining.tryAcquire(-remainingChange, keepAlive,
> TimeUnit.SECONDS)) {
> bytesRemaining.release(putByteCounter);
> throw new ChannelFullException("Space for commit to queue couldn't
> be acquired." +
> " Sinks are likely not keeping up with sources, or the buffer
> size is too tight");
> }
> }
> int puts = putList.size();
> int takes = takeList.size();
> synchronized(queueLock) {
> if(puts > 0 ) {
> while(!putList.isEmpty()) {
> if(!queue.offer(putList.removeFirst())) {
> throw new RuntimeException("Queue add failed, this shouldn't be
> able to happen");
> }
> }
> bytesRemaining.release(putByteCounter);
> }
> putList.clear();
> takeList.clear();
> }
> bytesRemaining.release(takeByteCounter);
> takeByteCounter = 0;
> putByteCounter = 0;
> queueStored.release(puts);
> if(remainingChange > 0) {
> queueRemaining.release(remainingChange);
> }
> if (puts > 0) {
> channelCounter.addToEventPutSuccessCount(puts);
> }
> if (takes > 0) {
> channelCounter.addToEventTakeSuccessCount(takes);
> }
> channelCounter.setChannelSize(queue.size());
> }
> I add the code "bytesRemaining.release(putByteCounter);" after the event take
> the putList to queue.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)