[
https://issues.apache.org/jira/browse/FLUME-2739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
yinghua_zh updated FLUME-2739:
------------------------------
Summary: Memory channel does not release the put permits from the
bytesRemaining semaphore (was: Memory channel has not release permits from
the bytesRemaining semaphore )
> Memory channel does not release the put permits from the bytesRemaining
> semaphore
> ----------------------------------------------------------------------------------
>
> Key: FLUME-2739
> URL: https://issues.apache.org/jira/browse/FLUME-2739
> Project: Flume
> Issue Type: Bug
> Components: Channel
> Affects Versions: v1.5.0
> Reporter: yinghua_zh
>
> The memory channel apply the permit when the source put and commit the event
> into the memory channel,but it does not release the permits from the
> bytesRemaining semaphore when the source commits successfully,It will not
> accepet the event for a while.it will occurs the following exceptions:
> 015-07-08 07:52:06,089 | WARN | [pool-4-thread-1] | The channel is full,
> and cannot write data now. The source will try again after 500 milliseconds
> |
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:276)
> org.apache.flume.ChannelException: Unable to put batch on required channel:
> org.apache.flume.channel.MemoryChannel{name: mem_channel}
> at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
> at
> org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:273)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.flume.ChannelException: Cannot commit transaction. Byte
> capacity allocated to store event body 9.69943E8reached. Please increase heap
> space/byte capacity allocated to the channel as the sinks may not be keeping
> up with the sources
> at
> org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:131)
> at
> org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
> at
> org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
> I modify the doCommit method ,and like this ,it does not occur exception:
> @Override
> protected void doCommit() throws InterruptedException {
> int remainingChange = takeList.size() - putList.size();
> if(remainingChange < 0) {
> if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
> TimeUnit.SECONDS)) {
> throw new ChannelException("Cannot commit transaction. Byte
> capacity " +
> "allocated to store event body " + byteCapacity *
> byteCapacitySlotSize +
> "reached. Please increase heap space/byte capacity allocated to "
> +
> "the channel as the sinks may not be keeping up with the
> sources");
> }
> if(!queueRemaining.tryAcquire(-remainingChange, keepAlive,
> TimeUnit.SECONDS)) {
> bytesRemaining.release(putByteCounter);
> throw new ChannelFullException("Space for commit to queue couldn't
> be acquired." +
> " Sinks are likely not keeping up with sources, or the buffer
> size is too tight");
> }
> }
> int puts = putList.size();
> int takes = takeList.size();
> synchronized(queueLock) {
> if(puts > 0 ) {
> while(!putList.isEmpty()) {
> if(!queue.offer(putList.removeFirst())) {
> throw new RuntimeException("Queue add failed, this shouldn't be
> able to happen");
> }
> }
> bytesRemaining.release(putByteCounter);
> }
> putList.clear();
> takeList.clear();
> }
> bytesRemaining.release(takeByteCounter);
> takeByteCounter = 0;
> putByteCounter = 0;
> queueStored.release(puts);
> if(remainingChange > 0) {
> queueRemaining.release(remainingChange);
> }
> if (puts > 0) {
> channelCounter.addToEventPutSuccessCount(puts);
> }
> if (takes > 0) {
> channelCounter.addToEventTakeSuccessCount(takes);
> }
> channelCounter.setChannelSize(queue.size());
> }
> I add the code "bytesRemaining.release(putByteCounter);" after the event take
> the putList to queue.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)