Updated Branches: refs/heads/flume-1.4 2479bbfa8 -> 47dd06182
FLUME-1975. Use TThreadedSelectServer in ThriftSource if it is available. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/47dd0618 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/47dd0618 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/47dd0618 Branch: refs/heads/flume-1.4 Commit: 47dd06182e0419ecd189ed7bb7a6c66df77ca796 Parents: 2479bbf Author: Mike Percy <[email protected]> Authored: Fri Jun 21 17:27:55 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Jun 21 17:32:56 2013 -0700 ---------------------------------------------------------------------- .../org/apache/flume/source/ThriftSource.java | 97 +++++++++++++++++--- pom.xml | 4 +- 2 files changed, 87 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/47dd0618/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java index 68a632a..c3881b4 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/ThriftSource.java @@ -34,19 +34,22 @@ import org.apache.flume.thrift.ThriftSourceProtocol; import org.apache.flume.thrift.ThriftFlumeEvent; import org.apache.thrift.TException; import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.server.TNonblockingServer; import org.apache.thrift.server.TServer; -import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TFastFramedTransport; +import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.thrift.transport.TNonblockingServerTransport; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TServerTransport; -import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; public class ThriftSource extends AbstractSource implements Configurable, @@ -100,22 +103,90 @@ public class ThriftSource extends AbstractSource implements Configurable, @Override public void start() { logger.info("Starting thrift source"); + maxThreads = (maxThreads <= 0) ? Integer.MAX_VALUE : maxThreads; + Class<?> serverClass = null; + Class<?> argsClass = null; + TServer.AbstractServerArgs args = null; + /* + * Use reflection to determine if TThreadedSelectServer is available. If + * it is not available, use TThreadPoolServer + */ try { - serverTransport = new TServerSocket(new InetSocketAddress - (bindAddress, port)); - } catch (TTransportException e) { - throw new FlumeException("Failed to start Thrift Source.", e); + serverClass = Class.forName("org.apache.thrift" + + ".server.TThreadedSelectorServer"); + + argsClass = Class.forName("org.apache.thrift" + + ".server.TThreadedSelectorServer$Args"); + + // Looks like TThreadedSelectorServer is available, so continue.. + ExecutorService sourceService; + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat( + "Flume Thrift IPC Thread %d").build(); + if (maxThreads == 0) { + sourceService = Executors.newCachedThreadPool(threadFactory); + } else { + sourceService = Executors.newFixedThreadPool(maxThreads, threadFactory); + } + serverTransport = new TNonblockingServerSocket( + new InetSocketAddress(bindAddress, port)); + args = (TNonblockingServer.AbstractNonblockingServerArgs) argsClass + .getConstructor(TNonblockingServerTransport.class) + .newInstance(serverTransport); + Method m = argsClass.getDeclaredMethod("executorService", + ExecutorService.class); + m.invoke(args, sourceService); + } catch (ClassNotFoundException e) { + logger.info("TThreadedSelectorServer not found, " + + "using TThreadPoolServer"); + try { + // Looks like TThreadedSelectorServer is not available, + // so create a TThreadPoolServer instead. + + serverTransport = new TServerSocket(new InetSocketAddress + (bindAddress, port)); + + serverClass = Class.forName("org.apache.thrift" + + ".server.TThreadPoolServer"); + argsClass = Class.forName("org.apache.thrift.server" + + ".TThreadPoolServer$Args"); + args = (TServer.AbstractServerArgs) argsClass + .getConstructor(TServerTransport.class) + .newInstance(serverTransport); + Method m = argsClass.getDeclaredMethod("maxWorkerThreads",int.class); + m.invoke(args, maxThreads); + } catch (ClassNotFoundException e1) { + throw new FlumeException("Cannot find TThreadSelectorServer or " + + "TThreadPoolServer. Please install a compatible version of thrift " + + "in the classpath", e1); + } catch (Throwable throwable) { + throw new FlumeException("Cannot start Thrift source.", throwable); + } + } catch (Throwable throwable) { + throw new FlumeException("Cannot start Thrift source.", throwable); } - TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverTransport); - args.protocolFactory(new TCompactProtocol.Factory()); - args.inputTransportFactory(new TFastFramedTransport.Factory()); - args.outputTransportFactory(new TFastFramedTransport.Factory()); - args.processor(new ThriftSourceProtocol.Processor<ThriftSourceHandler>( - new ThriftSourceHandler())).maxWorkerThreads(maxThreads); + try { + + args.protocolFactory(new TCompactProtocol.Factory()); + args.inputTransportFactory(new TFastFramedTransport.Factory()); + args.outputTransportFactory(new TFastFramedTransport.Factory()); + args.processor(new ThriftSourceProtocol + .Processor<ThriftSourceHandler>(new ThriftSourceHandler())); + /* + * Both THsHaServer and TThreadedSelectorServer allows us to pass in + * the executor service to use - unfortunately the "executorService" + * method does not exist in the parent abstract Args class, + * so use reflection to pass the executor in. + * + */ + + server = (TServer) serverClass.getConstructor(argsClass).newInstance + (args); + } catch (Throwable ex) { + throw new FlumeException("Cannot start Thrift Source.", ex); + } - server = new TThreadPoolServer(args); servingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss") http://git-wip-us.apache.org/repos/asf/flume/blob/47dd0618/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d907586..ebc7e25 100644 --- a/pom.xml +++ b/pom.xml @@ -79,6 +79,7 @@ limitations under the License. <hadoop.version>1.0.1</hadoop.version> <hbase.version>0.92.1</hbase.version> <hadoop.common.artifact.id>hadoop-core</hadoop.common.artifact.id> + <thrift.version>0.7.0</thrift.version> </properties> <dependencyManagement> <dependencies> @@ -108,6 +109,7 @@ limitations under the License. <hadoop.version>2.0.0-alpha</hadoop.version> <hbase.version>0.94.2</hbase.version> <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id> + <thrift.version>0.8.0</thrift.version> </properties> <dependencyManagement> <dependencies> @@ -726,7 +728,7 @@ limitations under the License. <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> - <version>0.7.0</version> + <version>${thrift.version}</version> <exclusions> <exclusion> <groupId>javax.servlet</groupId>
