Repository: flume
Updated Branches:
  refs/heads/trunk 911de0078 -> f581f6ed0


FLUME-2812. Fix semaphore leak causing java.lang.Error: Maximum permit count 
exceeded in MemoryChannel

bytesRemaining.release(putByteCounter) gets called in 
MemoryTransaction.doRollback while acquire is called only in doCommit. This 
results in semaphore leak and the number of permits in the semaphore eventually 
exceeds Integer.MAX_VALUE and Semaphore.release() throws java.lang.Error: 
Maximum permit count exceeded.

This closes #83

Reviewers: Attila Simon, Bessenyei Balázs Donát

(Denes Arvay via Bessenyei Balázs Donát)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f581f6ed
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f581f6ed
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f581f6ed

Branch: refs/heads/trunk
Commit: f581f6ed0e820da370c4a83a4ff7b05278d09fc3
Parents: 911de00
Author: Denes Arvay <[email protected]>
Authored: Fri Oct 28 17:38:33 2016 +0200
Committer: Bessenyei Balázs Donát <[email protected]>
Committed: Sat Nov 5 05:06:59 2016 +0000

----------------------------------------------------------------------
 .../org/apache/flume/channel/MemoryChannel.java |  7 ++++++-
 .../apache/flume/channel/TestMemoryChannel.java | 20 ++++++++++++++++++--
 2 files changed, 24 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f581f6ed/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java 
b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
index 4393783..add40e9 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flume.channel;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.flume.ChannelException;
 import org.apache.flume.ChannelFullException;
@@ -171,7 +172,6 @@ public class MemoryChannel extends BasicChannelSemantics {
         }
         putList.clear();
       }
-      bytesRemaining.release(putByteCounter);
       putByteCounter = 0;
       takeByteCounter = 0;
 
@@ -374,4 +374,9 @@ public class MemoryChannel extends BasicChannelSemantics {
     //Each event occupies at least 1 slot, so return 1.
     return 1;
   }
+
+  @VisibleForTesting
+  int getBytesRemainingValue() {
+    return bytesRemaining.availablePermits();
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/f581f6ed/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
index 8921a19..344bb58 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
@@ -19,7 +19,7 @@
 
 package org.apache.flume.channel;
 
-import org.apache.flume.Channel;
+import com.google.common.collect.ImmutableMap;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -27,6 +27,7 @@ import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.event.EventBuilder;
+import org.apache.flume.event.SimpleEvent;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,7 +40,7 @@ import static org.fest.reflect.core.Reflection.field;
 
 public class TestMemoryChannel {
 
-  private Channel channel;
+  private MemoryChannel channel;
 
   @Before
   public void setUp() {
@@ -265,6 +266,21 @@ public class TestMemoryChannel {
 
   }
 
+  @Test
+  public void testByteCapacityAfterRollback() {
+    Context ctx = new Context(ImmutableMap.of("byteCapacity", "1000"));
+    Configurables.configure(channel,  ctx);
+
+    Assert.assertEquals(8, channel.getBytesRemainingValue());
+    Event e = new SimpleEvent();
+    Transaction t = channel.getTransaction();
+    t.begin();
+
+    channel.put(e);
+    t.rollback();
+    Assert.assertEquals(8, channel.getBytesRemainingValue());
+  }
+
   public void testByteCapacityBufferEmptyingAfterTakeCommit() {
     Context context = new Context();
     Map<String, String> parms = new HashMap<String, String>();

Reply via email to