Updated Branches:
  refs/heads/trunk e02654518 -> d3f5123c4

FLUME-2233. MemoryChannel lock contention on every put due to bytesRemaining 
Semaphore

(Hari Shreedharan via Roshan Naik)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d3f5123c
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d3f5123c
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d3f5123c

Branch: refs/heads/trunk
Commit: d3f5123c4d6cdbe4e5cca6e7e141e507bb1103a7
Parents: e026545
Author: Roshan Naik <[email protected]>
Authored: Thu Nov 7 11:42:05 2013 -0800
Committer: Roshan Naik <[email protected]>
Committed: Thu Nov 7 11:42:05 2013 -0800

----------------------------------------------------------------------
 .../org/apache/flume/channel/MemoryChannel.java | 25 ++++----
 .../apache/flume/channel/TestMemoryChannel.java | 65 +++++++++++++++-----
 2 files changed, 64 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d3f5123c/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java 
b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
index 688323d..f10a79f 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
@@ -79,18 +79,11 @@ public class MemoryChannel extends BasicChannelSemantics {
       channelCounter.incrementEventPutAttemptCount();
       int eventByteSize = 
(int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);
 
-      if (bytesRemaining.tryAcquire(eventByteSize, keepAlive, 
TimeUnit.SECONDS)) {
-        if(!putList.offer(event)) {
-          throw new ChannelException("Put queue for MemoryTransaction of 
capacity " +
-              putList.size() + " full, consider committing more frequently, " +
-              "increasing capacity or increasing thread count");
-        }
-      } else {
-        throw new ChannelException("Put queue for MemoryTransaction of 
byteCapacity " +
-            (lastByteCapacity * (int)byteCapacitySlotSize) + " bytes cannot 
add an " +
-            " event of size " + estimateEventSize(event) + " bytes because " +
-             (bytesRemaining.availablePermits() * (int)byteCapacitySlotSize) + 
" bytes are already used." +
-            " Try consider comitting more frequently, increasing byteCapacity 
or increasing thread count");
+      if (!putList.offer(event)) {
+        throw new ChannelException(
+          "Put queue for MemoryTransaction of capacity " +
+            putList.size() + " full, consider committing more frequently, " +
+            "increasing capacity or increasing thread count");
       }
       putByteCounter += eventByteSize;
     }
@@ -124,7 +117,15 @@ public class MemoryChannel extends BasicChannelSemantics {
     protected void doCommit() throws InterruptedException {
       int remainingChange = takeList.size() - putList.size();
       if(remainingChange < 0) {
+        if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,
+          TimeUnit.SECONDS)) {
+          throw new ChannelException("Cannot commit transaction. Heap space " +
+            "limit of " + byteCapacity + "reached. Please increase heap space" 
+
+            " allocated to the channel as the sinks may not be keeping up " +
+            "with the sources");
+        }
         if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, 
TimeUnit.SECONDS)) {
+          bytesRemaining.release(putByteCounter);
           throw new ChannelException("Space for commit to queue couldn't be 
acquired" +
               " Sinks are likely not keeping up with sources, or the buffer 
size is too tight");
         }

http://git-wip-us.apache.org/repos/asf/flume/blob/d3f5123c/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
index a78581a..7851536 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
@@ -356,63 +356,100 @@ public class TestMemoryChannel {
     Transaction tx = channel.getTransaction();
     tx.begin();
     channel.put(EventBuilder.withBody(eventBody));
-
+    tx.commit();
+    tx.close();
+    channel.stop();
     parms.put("byteCapacity", "1500");
     context.putAll(parms);
     Configurables.configure(channel,  context);
-
+    channel.start();
+    tx = channel.getTransaction();
+    tx.begin();
     channel.put(EventBuilder.withBody(eventBody));
     try {
       channel.put(EventBuilder.withBody(eventBody));
+      tx.commit();
       Assert.fail();
     } catch ( ChannelException e ) {
       //success
+      tx.rollback();
+    } finally {
+      tx.close();
     }
 
-    parms.put("byteCapacity", "2500");
+    channel.stop();
+    parms.put("byteCapacity", "250");
     parms.put("byteCapacityBufferPercentage", "20");
     context.putAll(parms);
     Configurables.configure(channel,  context);
-
+    channel.start();
+    tx = channel.getTransaction();
+    tx.begin();
     channel.put(EventBuilder.withBody(eventBody));
+    tx.commit();
+    tx.close();
+    channel.stop();
 
     parms.put("byteCapacity", "300");
     context.putAll(parms);
     Configurables.configure(channel,  context);
-
-    channel.put(EventBuilder.withBody(eventBody));
+    channel.start();
+    tx = channel.getTransaction();
+    tx.begin();
     try {
-      channel.put(EventBuilder.withBody(eventBody));
+      for(int i = 0; i < 2; i++) {
+        channel.put(EventBuilder.withBody(eventBody));
+      }
+      tx.commit();
       Assert.fail();
     } catch ( ChannelException e ) {
       //success
+      tx.rollback();
+    } finally {
+      tx.close();
     }
 
+    channel.stop();
     parms.put("byteCapacity", "3300");
     context.putAll(parms);
     Configurables.configure(channel,  context);
-
-    channel.put(EventBuilder.withBody(eventBody));
+    channel.start();
+    tx = channel.getTransaction();
+    tx.begin();
 
     try {
-      channel.put(EventBuilder.withBody(eventBody));
+      for(int i = 0; i < 15; i++) {
+        channel.put(EventBuilder.withBody(eventBody));
+      }
+      tx.commit();
       Assert.fail();
     } catch ( ChannelException e ) {
       //success
+      tx.rollback();
+    } finally {
+      tx.close();
     }
-
+    channel.stop();
     parms.put("byteCapacity", "4000");
     context.putAll(parms);
     Configurables.configure(channel,  context);
-
-    channel.put(EventBuilder.withBody(eventBody));
+    channel.start();
+    tx = channel.getTransaction();
+    tx.begin();
 
     try {
-      channel.put(EventBuilder.withBody(eventBody));
+      for(int i = 0; i < 25; i++) {
+        channel.put(EventBuilder.withBody(eventBody));
+      }
+      tx.commit();
       Assert.fail();
     } catch ( ChannelException e ) {
       //success
+      tx.rollback();
+    } finally {
+      tx.close();
     }
+    channel.stop();
   }
 
   /*

Reply via email to