yinghua_zh created FLUME-2739:
---------------------------------
Summary: Memory channel has not release permits from the
bytesRemaining semaphore
Key: FLUME-2739
URL: https://issues.apache.org/jira/browse/FLUME-2739
Project: Flume
Issue Type: Bug
Components: Channel
Affects Versions: v1.5.0
Reporter: yinghua_zh
The memory channel apply the permit when the source put and commit the event
into the memory channel,but it does not release the permits from the
bytesRemaining semaphore when the source commits successfully,It will not
accepet the event for a while.it will occurs the following exceptions:
015-07-08 07:52:06,089 | WARN | [pool-4-thread-1] | The channel is full, and
cannot write data now. The source will try again after 500 milliseconds |
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:276)
org.apache.flume.ChannelException: Unable to put batch on required channel:
org.apache.flume.channel.MemoryChannel{name: mem_channel}
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
at
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:273)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Cannot commit transaction. Byte
capacity allocated to store event body 9.69943E8reached. Please increase heap
space/byte capacity allocated to the channel as the sinks may not be keeping up
with the sources
at
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:131)
at
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
at
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)
I modify the doCommit method ,and like this ,it does not occur exception:
@Override
protected void doCommit() throws InterruptedException {
int remainingChange = takeList.size() - putList.size();
if(remainingChange < 0) {
if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
TimeUnit.SECONDS)) {
throw new ChannelException("Cannot commit transaction. Byte capacity
" +
"allocated to store event body " + byteCapacity *
byteCapacitySlotSize +
"reached. Please increase heap space/byte capacity allocated to " +
"the channel as the sinks may not be keeping up with the sources");
}
if(!queueRemaining.tryAcquire(-remainingChange, keepAlive,
TimeUnit.SECONDS)) {
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("Space for commit to queue couldn't be
acquired." +
" Sinks are likely not keeping up with sources, or the buffer
size is too tight");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized(queueLock) {
if(puts > 0 ) {
while(!putList.isEmpty()) {
if(!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be
able to happen");
}
}
bytesRemaining.release(putByteCounter);
}
putList.clear();
takeList.clear();
}
bytesRemaining.release(takeByteCounter);
takeByteCounter = 0;
putByteCounter = 0;
queueStored.release(puts);
if(remainingChange > 0) {
queueRemaining.release(remainingChange);
}
if (puts > 0) {
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) {
channelCounter.addToEventTakeSuccessCount(takes);
}
channelCounter.setChannelSize(queue.size());
}
I add the code "bytesRemaining.release(putByteCounter);" after the event take
the putList to queue.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)