yinghua_zh created FLUME-2739:
---------------------------------

             Summary: Memory channel has not release permits from the 
bytesRemaining semaphore 
                 Key: FLUME-2739
                 URL: https://issues.apache.org/jira/browse/FLUME-2739
             Project: Flume
          Issue Type: Bug
          Components: Channel
    Affects Versions: v1.5.0
            Reporter: yinghua_zh


The memory channel apply the permit when the source put and commit the event 
into the memory channel,but it does not release the permits from the 
bytesRemaining  semaphore when the source commits successfully,It will not 
accepet the event for a while.it will occurs the following exceptions:
015-07-08 07:52:06,089 | WARN  | [pool-4-thread-1] |  The channel is full, and 
cannot write data now. The source will try again after 500 milliseconds  | 
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:276)
org.apache.flume.ChannelException: Unable to put batch on required channel: 
org.apache.flume.channel.MemoryChannel{name: mem_channel}
        at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:200)
        at 
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:273)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Cannot commit transaction. Byte 
capacity allocated to store event body 9.69943E8reached. Please increase heap 
space/byte capacity allocated to the channel as the sinks may not be keeping up 
with the sources
        at 
org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:131)
        at 
org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
        at 
org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:192)

I modify the doCommit method ,and like this ,it does not occur exception:
   @Override
    protected void doCommit() throws InterruptedException {
      int remainingChange = takeList.size() - putList.size();
      if(remainingChange < 0) {
        if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
          TimeUnit.SECONDS)) {
          throw new ChannelException("Cannot commit transaction. Byte capacity 
" +
            "allocated to store event body " + byteCapacity * 
byteCapacitySlotSize +
            "reached. Please increase heap space/byte capacity allocated to " +
            "the channel as the sinks may not be keeping up with the sources");
        }
        if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, 
TimeUnit.SECONDS)) {
          bytesRemaining.release(putByteCounter);
          throw new ChannelFullException("Space for commit to queue couldn't be 
acquired." +
              " Sinks are likely not keeping up with sources, or the buffer 
size is too tight");
        }
      }
      int puts = putList.size();
      int takes = takeList.size();
      synchronized(queueLock) {
        if(puts > 0 ) {
          while(!putList.isEmpty()) {
            if(!queue.offer(putList.removeFirst())) {
              throw new RuntimeException("Queue add failed, this shouldn't be 
able to happen");
            }
          }
          bytesRemaining.release(putByteCounter);
        }
        putList.clear();
        takeList.clear();
      }
      bytesRemaining.release(takeByteCounter);
      takeByteCounter = 0;
      putByteCounter = 0;

      queueStored.release(puts);
      if(remainingChange > 0) {
        queueRemaining.release(remainingChange);
      }
      if (puts > 0) {
        channelCounter.addToEventPutSuccessCount(puts);
      }
      if (takes > 0) {
        channelCounter.addToEventTakeSuccessCount(takes);
      }

      channelCounter.setChannelSize(queue.size());
    }


I add the code "bytesRemaining.release(putByteCounter);" after the event take 
the putList to queue.









--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to