[ 
https://issues.apache.org/jira/browse/FLUME-2739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617964#comment-14617964
 ] 

yinghua_zh edited comment on FLUME-2739 at 7/8/15 4:49 AM:
-----------------------------------------------------------

The config information as follow:
client.sources = spoolsource 
client.channels = mem_channel 
client.sinks = test_sink 

client.sources.spoolsource.type = spooldir
client.sources.spoolsource.spoolDir = /srv/BigData/journalnode/test_data
client.sources.spoolsource.fileSuffix = .COMPLETED
client.sources.spoolsource.ignorePattern = ^$
client.sources.spoolsource.trackerDir = /srv/BigData/hadoop/spoolsource/tracker1
client.sources.spoolsource.maxBlobLength = 16384
client.sources.spoolsource.batchSize = 1024
client.sources.spoolsource.inputCharset = UTF-8
client.sources.spoolsource.deserializer = LINE
client.sources.spoolsource.selector.type = replicating
client.sources.spoolsource.fileHeaderKey = file
client.sources.spoolsource.fileHeader = false
client.sources.spoolsource.basenameHeader = true
client.sources.spoolsource.basenameHeaderKey = basename
client.sources.spoolsource.deletePolicy = never
client.sources.spoolsource.deserializer.maxLineLength = 200000 
client.sources.spoolsource.deserializer.maxBatchLine = 5   

             


client.channels.mem_channel.type = memory
client.channels.mem_channel.capacity = 10240
client.channels.mem_channel.keep-alive = 3
client.channels.mem_channel.transactionCapacity = 10240
client.channels.mem_channel.byteCapacity = 1212428800


client.sinks.test_sink.type = null
#client.sinks.test_sink.hostname = 51.196.115.3
#client.sinks.test_sink.port = 31001
client.sinks.test_sink.batch-size = 10240
#client.sinks.test_sink.connect-timeout =  60000
#client.sinks.test_sink.request-timeout = 60000



client.sources.spoolsource.channels = mem_channel
client.sinks.test_sink.channel = mem_channel


If the source collent one 3G file ,the problem will be occurred.


was (Author: yinghua_zh):
The config information as follow:
client.sources = spoolsource 
client.channels = mem_channel 
client.sinks = test_sink 

client.sources.spoolsource.type = spooldir
client.sources.spoolsource.spoolDir = /srv/BigData/journalnode/test_data
client.sources.spoolsource.fileSuffix = .COMPLETED
client.sources.spoolsource.ignorePattern = ^$
client.sources.spoolsource.trackerDir = /srv/BigData/hadoop/spoolsource/tracker1
client.sources.spoolsource.maxBlobLength = 16384
client.sources.spoolsource.batchSize = 1024
client.sources.spoolsource.inputCharset = UTF-8
client.sources.spoolsource.deserializer = LINE
client.sources.spoolsource.selector.type = replicating
client.sources.spoolsource.fileHeaderKey = file
client.sources.spoolsource.fileHeader = false
client.sources.spoolsource.basenameHeader = true
client.sources.spoolsource.basenameHeaderKey = basename
client.sources.spoolsource.deletePolicy = never
client.sources.spoolsource.deserializer.maxLineLength = 200000 
client.sources.spoolsource.deserializer.maxBatchLine = 5   

             


client.channels.mem_channel.type = memory
client.channels.mem_channel.capacity = 10240
client.channels.mem_channel.keep-alive = 3
client.channels.mem_channel.transactionCapacity = 10240
client.channels.mem_channel.byteCapacity = 1212428800

client.channels.file_2_channel.type = memory
client.channels.file_2_channel.capacity = 10240
client.channels.file_2_channel.keep-alive = 3
client.channels.file_2_channel.transactionCapacity = 10240
client.channels.file_2_channel.byteCapacity = 1212428800


client.sinks.test_sink.type = null
#client.sinks.test_sink.hostname = 51.196.115.3
#client.sinks.test_sink.port = 31001
client.sinks.test_sink.batch-size = 10240
#client.sinks.test_sink.connect-timeout =  60000
#client.sinks.test_sink.request-timeout = 60000



client.sources.spoolsource.channels = mem_channel
client.sinks.test_sink.channel = mem_channel


If the source collent one 3G file ,the problem will be occurred.

> Memory channel has not release permits from the bytesRemaining semaphore 
> -------------------------------------------------------------------------
>
>                 Key: FLUME-2739
>                 URL: https://issues.apache.org/jira/browse/FLUME-2739
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: v1.5.0
>            Reporter: yinghua_zh
>
> The memory channel apply the permit when the source put and commit the event 
> into the memory channel,but it does not release the permits from the 
> bytesRemaining  semaphore when the source commits successfully,It will not 
> accepet the event for a while.it will occurs the following exceptions:
> 015-07-08 07:52:06,089 | WARN  | [pool-4-thread-1] |  The channel is full, 
> and cannot write data now. The source will try again after 500 milliseconds  
> | 
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:276)
> org.apache.flume.ChannelException: Unable to put batch on required channel: 
> org.apache.flume.channel.MemoryChannel{name: mem_channel}
>       at 
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
>       at 
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:273)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.ChannelException: Cannot commit transaction. Byte 
> capacity allocated to store event body 9.69943E8reached. Please increase heap 
> space/byte capacity allocated to the channel as the sinks may not be keeping 
> up with the sources
>       at 
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:131)
>       at 
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>       at 
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> I modify the doCommit method ,and like this ,it does not occur exception:
>    @Override
>     protected void doCommit() throws InterruptedException {
>       int remainingChange = takeList.size() - putList.size();
>       if(remainingChange < 0) {
>         if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
>           TimeUnit.SECONDS)) {
>           throw new ChannelException("Cannot commit transaction. Byte 
> capacity " +
>             "allocated to store event body " + byteCapacity * 
> byteCapacitySlotSize +
>             "reached. Please increase heap space/byte capacity allocated to " 
> +
>             "the channel as the sinks may not be keeping up with the 
> sources");
>         }
>         if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, 
> TimeUnit.SECONDS)) {
>           bytesRemaining.release(putByteCounter);
>           throw new ChannelFullException("Space for commit to queue couldn't 
> be acquired." +
>               " Sinks are likely not keeping up with sources, or the buffer 
> size is too tight");
>         }
>       }
>       int puts = putList.size();
>       int takes = takeList.size();
>       synchronized(queueLock) {
>         if(puts > 0 ) {
>           while(!putList.isEmpty()) {
>             if(!queue.offer(putList.removeFirst())) {
>               throw new RuntimeException("Queue add failed, this shouldn't be 
> able to happen");
>             }
>           }
>           bytesRemaining.release(putByteCounter);
>         }
>         putList.clear();
>         takeList.clear();
>       }
>       bytesRemaining.release(takeByteCounter);
>       takeByteCounter = 0;
>       putByteCounter = 0;
>       queueStored.release(puts);
>       if(remainingChange > 0) {
>         queueRemaining.release(remainingChange);
>       }
>       if (puts > 0) {
>         channelCounter.addToEventPutSuccessCount(puts);
>       }
>       if (takes > 0) {
>         channelCounter.addToEventTakeSuccessCount(takes);
>       }
>       channelCounter.setChannelSize(queue.size());
>     }
> I add the code "bytesRemaining.release(putByteCounter);" after the event take 
> the putList to queue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to