Updated Branches:
  refs/heads/trunk 458a1a2f7 -> d4c1bc07b

FLUME-1981. Rpc client expiration can be done in a more thread-safe way.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d4c1bc07
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d4c1bc07
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d4c1bc07

Branch: refs/heads/trunk
Commit: d4c1bc07bc33e10cfdc5eca11b085e269dbea60d
Parents: 458a1a2
Author: Mike Percy <[email protected]>
Authored: Wed May 8 15:56:33 2013 -0700
Committer: Mike Percy <[email protected]>
Committed: Wed May 8 15:56:33 2013 -0700

----------------------------------------------------------------------
 .../org/apache/flume/sink/AbstractRpcSink.java     |   17 ++++++++++++++-
 1 files changed, 16 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/d4c1bc07/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java 
b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
index 892c949..b3208fc 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractRpcSink.java
@@ -43,6 +43,8 @@ import java.util.Properties;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This sink provides the basic RPC functionality for Flume. This sink takes
@@ -151,6 +153,7 @@ public abstract class AbstractRpcSink extends AbstractSink
   private final ScheduledExecutorService cxnResetExecutor = Executors
     .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
       .setNameFormat("Rpc Sink Reset Thread").build());
+  private final Lock resetLock = new ReentrantLock();
 
   @Override
   public void configure(Context context) {
@@ -211,7 +214,17 @@ public abstract class AbstractRpcSink extends AbstractSink
           cxnResetExecutor.schedule(new Runnable() {
             @Override
             public void run() {
-              destroyConnection();
+              resetLock.lock();
+              try {
+                destroyConnection();
+                createConnection();
+              } catch (Throwable throwable) {
+                //Don't rethrow, else this runnable won't get scheduled again.
+                logger.error("Error while trying to expire connection",
+                  throwable);
+              } finally {
+                resetLock.unlock();
+              }
             }
           }, cxnResetInterval, TimeUnit.SECONDS);
         }
@@ -319,6 +332,7 @@ public abstract class AbstractRpcSink extends AbstractSink
     Channel channel = getChannel();
     Transaction transaction = channel.getTransaction();
 
+    resetLock.lock();
     try {
       transaction.begin();
 
@@ -368,6 +382,7 @@ public abstract class AbstractRpcSink extends AbstractSink
         throw new EventDeliveryException("Failed to send events", t);
       }
     } finally {
+      resetLock.unlock();
       transaction.close();
     }
 

Reply via email to