Repository: flume Updated Branches: refs/heads/trunk 911de0078 -> f581f6ed0
FLUME-2812. Fix semaphore leak causing java.lang.Error: Maximum permit count exceeded in MemoryChannel bytesRemaining.release(putByteCounter) gets called in MemoryTransaction.doRollback while acquire is called only in doCommit. This results in semaphore leak and the number of permits in the semaphore eventually exceeds Integer.MAX_VALUE and Semaphore.release() throws java.lang.Error: Maximum permit count exceeded. This closes #83 Reviewers: Attila Simon, Bessenyei Balázs Donát (Denes Arvay via Bessenyei Balázs Donát) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f581f6ed Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f581f6ed Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f581f6ed Branch: refs/heads/trunk Commit: f581f6ed0e820da370c4a83a4ff7b05278d09fc3 Parents: 911de00 Author: Denes Arvay <[email protected]> Authored: Fri Oct 28 17:38:33 2016 +0200 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Sat Nov 5 05:06:59 2016 +0000 ---------------------------------------------------------------------- .../org/apache/flume/channel/MemoryChannel.java | 7 ++++++- .../apache/flume/channel/TestMemoryChannel.java | 20 ++++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/f581f6ed/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java index 4393783..add40e9 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java @@ -18,6 +18,7 @@ */ package org.apache.flume.channel; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.flume.ChannelException; import org.apache.flume.ChannelFullException; @@ -171,7 +172,6 @@ public class MemoryChannel extends BasicChannelSemantics { } putList.clear(); } - bytesRemaining.release(putByteCounter); putByteCounter = 0; takeByteCounter = 0; @@ -374,4 +374,9 @@ public class MemoryChannel extends BasicChannelSemantics { //Each event occupies at least 1 slot, so return 1. return 1; } + + @VisibleForTesting + int getBytesRemainingValue() { + return bytesRemaining.availablePermits(); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/f581f6ed/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java index 8921a19..344bb58 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java @@ -19,7 +19,7 @@ package org.apache.flume.channel; -import org.apache.flume.Channel; +import com.google.common.collect.ImmutableMap; import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; @@ -27,6 +27,7 @@ import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; +import org.apache.flume.event.SimpleEvent; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -39,7 +40,7 @@ import static org.fest.reflect.core.Reflection.field; public class TestMemoryChannel { - private Channel channel; + private MemoryChannel channel; @Before public void setUp() { @@ -265,6 +266,21 @@ public class TestMemoryChannel { } + @Test + public void testByteCapacityAfterRollback() { + Context ctx = new Context(ImmutableMap.of("byteCapacity", "1000")); + Configurables.configure(channel, ctx); + + Assert.assertEquals(8, channel.getBytesRemainingValue()); + Event e = new SimpleEvent(); + Transaction t = channel.getTransaction(); + t.begin(); + + channel.put(e); + t.rollback(); + Assert.assertEquals(8, channel.getBytesRemainingValue()); + } + public void testByteCapacityBufferEmptyingAfterTakeCommit() { Context context = new Context(); Map<String, String> parms = new HashMap<String, String>();
