Updated Branches: refs/heads/flume-1.5 b7ef76b16 -> f53d62a35
FLUME-2233. MemoryChannel lock contention on every put due to bytesRemaining Semaphore (Hari Shreedharan via Roshan Naik) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f53d62a3 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f53d62a3 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f53d62a3 Branch: refs/heads/flume-1.5 Commit: f53d62a359a3a74d0c0fe763ca2f547a188e612e Parents: b7ef76b Author: Roshan Naik <[email protected]> Authored: Thu Nov 7 11:42:05 2013 -0800 Committer: Roshan Naik <[email protected]> Committed: Thu Nov 7 11:43:12 2013 -0800 ---------------------------------------------------------------------- .../org/apache/flume/channel/MemoryChannel.java | 25 ++++---- .../apache/flume/channel/TestMemoryChannel.java | 65 +++++++++++++++----- 2 files changed, 64 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/f53d62a3/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java index 688323d..f10a79f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java @@ -79,18 +79,11 @@ public class MemoryChannel extends BasicChannelSemantics { channelCounter.incrementEventPutAttemptCount(); int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize); - if (bytesRemaining.tryAcquire(eventByteSize, keepAlive, TimeUnit.SECONDS)) { - if(!putList.offer(event)) { - throw new ChannelException("Put queue for MemoryTransaction of capacity " + - putList.size() + " full, consider committing more frequently, " + - "increasing capacity or increasing thread count"); - } - } else { - throw new ChannelException("Put queue for MemoryTransaction of byteCapacity " + - (lastByteCapacity * (int)byteCapacitySlotSize) + " bytes cannot add an " + - " event of size " + estimateEventSize(event) + " bytes because " + - (bytesRemaining.availablePermits() * (int)byteCapacitySlotSize) + " bytes are already used." + - " Try consider comitting more frequently, increasing byteCapacity or increasing thread count"); + if (!putList.offer(event)) { + throw new ChannelException( + "Put queue for MemoryTransaction of capacity " + + putList.size() + " full, consider committing more frequently, " + + "increasing capacity or increasing thread count"); } putByteCounter += eventByteSize; } @@ -124,7 +117,15 @@ public class MemoryChannel extends BasicChannelSemantics { protected void doCommit() throws InterruptedException { int remainingChange = takeList.size() - putList.size(); if(remainingChange < 0) { + if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive, + TimeUnit.SECONDS)) { + throw new ChannelException("Cannot commit transaction. Heap space " + + "limit of " + byteCapacity + "reached. Please increase heap space" + + " allocated to the channel as the sinks may not be keeping up " + + "with the sources"); + } if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) { + bytesRemaining.release(putByteCounter); throw new ChannelException("Space for commit to queue couldn't be acquired" + " Sinks are likely not keeping up with sources, or the buffer size is too tight"); } http://git-wip-us.apache.org/repos/asf/flume/blob/f53d62a3/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java index a78581a..7851536 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java +++ b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java @@ -356,63 +356,100 @@ public class TestMemoryChannel { Transaction tx = channel.getTransaction(); tx.begin(); channel.put(EventBuilder.withBody(eventBody)); - + tx.commit(); + tx.close(); + channel.stop(); parms.put("byteCapacity", "1500"); context.putAll(parms); Configurables.configure(channel, context); - + channel.start(); + tx = channel.getTransaction(); + tx.begin(); channel.put(EventBuilder.withBody(eventBody)); try { channel.put(EventBuilder.withBody(eventBody)); + tx.commit(); Assert.fail(); } catch ( ChannelException e ) { //success + tx.rollback(); + } finally { + tx.close(); } - parms.put("byteCapacity", "2500"); + channel.stop(); + parms.put("byteCapacity", "250"); parms.put("byteCapacityBufferPercentage", "20"); context.putAll(parms); Configurables.configure(channel, context); - + channel.start(); + tx = channel.getTransaction(); + tx.begin(); channel.put(EventBuilder.withBody(eventBody)); + tx.commit(); + tx.close(); + channel.stop(); parms.put("byteCapacity", "300"); context.putAll(parms); Configurables.configure(channel, context); - - channel.put(EventBuilder.withBody(eventBody)); + channel.start(); + tx = channel.getTransaction(); + tx.begin(); try { - channel.put(EventBuilder.withBody(eventBody)); + for(int i = 0; i < 2; i++) { + channel.put(EventBuilder.withBody(eventBody)); + } + tx.commit(); Assert.fail(); } catch ( ChannelException e ) { //success + tx.rollback(); + } finally { + tx.close(); } + channel.stop(); parms.put("byteCapacity", "3300"); context.putAll(parms); Configurables.configure(channel, context); - - channel.put(EventBuilder.withBody(eventBody)); + channel.start(); + tx = channel.getTransaction(); + tx.begin(); try { - channel.put(EventBuilder.withBody(eventBody)); + for(int i = 0; i < 15; i++) { + channel.put(EventBuilder.withBody(eventBody)); + } + tx.commit(); Assert.fail(); } catch ( ChannelException e ) { //success + tx.rollback(); + } finally { + tx.close(); } - + channel.stop(); parms.put("byteCapacity", "4000"); context.putAll(parms); Configurables.configure(channel, context); - - channel.put(EventBuilder.withBody(eventBody)); + channel.start(); + tx = channel.getTransaction(); + tx.begin(); try { - channel.put(EventBuilder.withBody(eventBody)); + for(int i = 0; i < 25; i++) { + channel.put(EventBuilder.withBody(eventBody)); + } + tx.commit(); Assert.fail(); } catch ( ChannelException e ) { //success + tx.rollback(); + } finally { + tx.close(); } + channel.stop(); } /*
