[ 
https://issues.apache.org/jira/browse/FLUME-2739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yinghua_zh updated FLUME-2739:
------------------------------
    Summary: Memory channel does not release the put permits from the 
bytesRemaining semaphore   (was: Memory channel has not release permits from 
the bytesRemaining semaphore )

> Memory channel does not release the put permits from the bytesRemaining 
> semaphore 
> ----------------------------------------------------------------------------------
>
>                 Key: FLUME-2739
>                 URL: https://issues.apache.org/jira/browse/FLUME-2739
>             Project: Flume
>          Issue Type: Bug
>          Components: Channel
>    Affects Versions: v1.5.0
>            Reporter: yinghua_zh
>
> The memory channel apply the permit when the source put and commit the event 
> into the memory channel,but it does not release the permits from the 
> bytesRemaining  semaphore when the source commits successfully,It will not 
> accepet the event for a while.it will occurs the following exceptions:
> 015-07-08 07:52:06,089 | WARN  | [pool-4-thread-1] |  The channel is full, 
> and cannot write data now. The source will try again after 500 milliseconds  
> | 
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:276)
> org.apache.flume.ChannelException: Unable to put batch on required channel: 
> org.apache.flume.channel.MemoryChannel{name: mem_channel}
>       at 
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
>       at 
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:273)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.ChannelException: Cannot commit transaction. Byte 
> capacity allocated to store event body 9.69943E8reached. Please increase heap 
> space/byte capacity allocated to the channel as the sinks may not be keeping 
> up with the sources
>       at 
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:131)
>       at 
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
>       at 
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> I modify the doCommit method ,and like this ,it does not occur exception:
>    @Override
>     protected void doCommit() throws InterruptedException {
>       int remainingChange = takeList.size() - putList.size();
>       if(remainingChange < 0) {
>         if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
>           TimeUnit.SECONDS)) {
>           throw new ChannelException("Cannot commit transaction. Byte 
> capacity " +
>             "allocated to store event body " + byteCapacity * 
> byteCapacitySlotSize +
>             "reached. Please increase heap space/byte capacity allocated to " 
> +
>             "the channel as the sinks may not be keeping up with the 
> sources");
>         }
>         if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, 
> TimeUnit.SECONDS)) {
>           bytesRemaining.release(putByteCounter);
>           throw new ChannelFullException("Space for commit to queue couldn't 
> be acquired." +
>               " Sinks are likely not keeping up with sources, or the buffer 
> size is too tight");
>         }
>       }
>       int puts = putList.size();
>       int takes = takeList.size();
>       synchronized(queueLock) {
>         if(puts > 0 ) {
>           while(!putList.isEmpty()) {
>             if(!queue.offer(putList.removeFirst())) {
>               throw new RuntimeException("Queue add failed, this shouldn't be 
> able to happen");
>             }
>           }
>           bytesRemaining.release(putByteCounter);
>         }
>         putList.clear();
>         takeList.clear();
>       }
>       bytesRemaining.release(takeByteCounter);
>       takeByteCounter = 0;
>       putByteCounter = 0;
>       queueStored.release(puts);
>       if(remainingChange > 0) {
>         queueRemaining.release(remainingChange);
>       }
>       if (puts > 0) {
>         channelCounter.addToEventPutSuccessCount(puts);
>       }
>       if (takes > 0) {
>         channelCounter.addToEventTakeSuccessCount(takes);
>       }
>       channelCounter.setChannelSize(queue.size());
>     }
> I add the code "bytesRemaining.release(putByteCounter);" after the event take 
> the putList to queue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to