Updated Branches:
  refs/heads/flume-1.4 2479bbfa8 -> 47dd06182

FLUME-1975. Use TThreadedSelectServer in ThriftSource if it is available.

(Hari Shreedharan via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/47dd0618
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/47dd0618
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/47dd0618

Branch: refs/heads/flume-1.4
Commit: 47dd06182e0419ecd189ed7bb7a6c66df77ca796
Parents: 2479bbf
Author: Mike Percy <[email protected]>
Authored: Fri Jun 21 17:27:55 2013 -0700
Committer: Mike Percy <[email protected]>
Committed: Fri Jun 21 17:32:56 2013 -0700

----------------------------------------------------------------------
 .../org/apache/flume/source/ThriftSource.java   | 97 +++++++++++++++++---
 pom.xml                                         |  4 +-
 2 files changed, 87 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/47dd0618/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
index 68a632a..c3881b4 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java
@@ -34,19 +34,22 @@ import org.apache.flume.thrift.ThriftSourceProtocol;
 import org.apache.flume.thrift.ThriftFlumeEvent;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TThreadPoolServer;
 import org.apache.thrift.transport.TFastFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TServerSocket;
 import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
 public class ThriftSource extends AbstractSource implements Configurable,
@@ -100,22 +103,90 @@ public class ThriftSource extends AbstractSource 
implements Configurable,
   @Override
   public void start() {
     logger.info("Starting thrift source");
+
     maxThreads = (maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads;
+    Class<?> serverClass = null;
+    Class<?> argsClass = null;
+    TServer.AbstractServerArgs args = null;
+    /*
+     * Use reflection to determine if TThreadedSelectServer is available. If
+     * it is not available, use TThreadPoolServer
+     */
     try {
-      serverTransport = new TServerSocket(new InetSocketAddress
-        (bindAddress, port));
-    } catch (TTransportException e) {
-      throw new FlumeException("Failed to start Thrift Source.", e);
+      serverClass = Class.forName("org.apache.thrift" +
+        ".server.TThreadedSelectorServer");
+
+      argsClass = Class.forName("org.apache.thrift" +
+        ".server.TThreadedSelectorServer$Args");
+
+      // Looks like TThreadedSelectorServer is available, so continue..
+      ExecutorService sourceService;
+      ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(
+        "Flume Thrift IPC Thread %d").build();
+      if (maxThreads == 0) {
+        sourceService = Executors.newCachedThreadPool(threadFactory);
+      } else {
+        sourceService = Executors.newFixedThreadPool(maxThreads, 
threadFactory);
+      }
+      serverTransport = new TNonblockingServerSocket(
+        new InetSocketAddress(bindAddress, port));
+      args = (TNonblockingServer.AbstractNonblockingServerArgs) argsClass
+        .getConstructor(TNonblockingServerTransport.class)
+        .newInstance(serverTransport);
+      Method m = argsClass.getDeclaredMethod("executorService",
+        ExecutorService.class);
+      m.invoke(args, sourceService);
+    } catch (ClassNotFoundException e) {
+      logger.info("TThreadedSelectorServer not found, " +
+        "using TThreadPoolServer");
+      try {
+        // Looks like TThreadedSelectorServer is not available,
+        // so create a TThreadPoolServer instead.
+
+        serverTransport = new TServerSocket(new InetSocketAddress
+          (bindAddress, port));
+
+        serverClass = Class.forName("org.apache.thrift" +
+          ".server.TThreadPoolServer");
+        argsClass = Class.forName("org.apache.thrift.server" +
+          ".TThreadPoolServer$Args");
+        args = (TServer.AbstractServerArgs) argsClass
+          .getConstructor(TServerTransport.class)
+          .newInstance(serverTransport);
+        Method m = argsClass.getDeclaredMethod("maxWorkerThreads",int.class);
+        m.invoke(args, maxThreads);
+      } catch (ClassNotFoundException e1) {
+        throw new FlumeException("Cannot find TThreadSelectorServer or " +
+          "TThreadPoolServer. Please install a compatible version of thrift " +
+          "in the classpath", e1);
+      } catch (Throwable throwable) {
+        throw new FlumeException("Cannot start Thrift source.", throwable);
+      }
+    } catch (Throwable throwable) {
+      throw new FlumeException("Cannot start Thrift source.", throwable);
     }
 
-    TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport);
-    args.protocolFactory(new TCompactProtocol.Factory());
-    args.inputTransportFactory(new TFastFramedTransport.Factory());
-    args.outputTransportFactory(new TFastFramedTransport.Factory());
-    args.processor(new ThriftSourceProtocol.Processor<ThriftSourceHandler>(
-      new ThriftSourceHandler())).maxWorkerThreads(maxThreads);
+    try {
+
+      args.protocolFactory(new TCompactProtocol.Factory());
+      args.inputTransportFactory(new TFastFramedTransport.Factory());
+      args.outputTransportFactory(new TFastFramedTransport.Factory());
+      args.processor(new ThriftSourceProtocol
+        .Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
+      /*
+       * Both THsHaServer and TThreadedSelectorServer allows us to pass in
+       * the executor service to use - unfortunately the "executorService"
+       * method does not exist in the parent abstract Args class,
+       * so use reflection to pass the executor in.
+       *
+       */
+
+      server = (TServer) serverClass.getConstructor(argsClass).newInstance
+        (args);
+    } catch (Throwable ex) {
+      throw new FlumeException("Cannot start Thrift Source.", ex);
+    }
 
-    server = new TThreadPoolServer(args);
 
     servingExecutor = Executors.newSingleThreadExecutor(new
       ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss")

http://git-wip-us.apache.org/repos/asf/flume/blob/47dd0618/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d907586..ebc7e25 100644
--- a/pom.xml
+++ b/pom.xml
@@ -79,6 +79,7 @@ limitations under the License.
         <hadoop.version>1.0.1</hadoop.version>
         <hbase.version>0.92.1</hbase.version>
         <hadoop.common.artifact.id>hadoop-core</hadoop.common.artifact.id>
+        <thrift.version>0.7.0</thrift.version>
       </properties>
       <dependencyManagement>
         <dependencies>
@@ -108,6 +109,7 @@ limitations under the License.
         <hadoop.version>2.0.0-alpha</hadoop.version>
         <hbase.version>0.94.2</hbase.version>
         <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id>
+        <thrift.version>0.8.0</thrift.version>
       </properties>
       <dependencyManagement>
         <dependencies>
@@ -726,7 +728,7 @@ limitations under the License.
       <dependency>
         <groupId>org.apache.thrift</groupId>
         <artifactId>libthrift</artifactId>
-        <version>0.7.0</version>
+        <version>${thrift.version}</version>
         <exclusions>
           <exclusion>
             <groupId>javax.servlet</groupId>

Reply via email to