Repository: tajo Updated Branches: refs/heads/master 64e47a401 -> 22876a825
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java index 0727f71..ed6b634 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcChannelFactory.java @@ -19,73 +19,125 @@ package org.apache.tajo.rpc; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.ServerSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.*; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.ThreadNameDeterminer; -import java.util.concurrent.Executors; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public final class RpcChannelFactory { private static final Log LOG = LogFactory.getLog(RpcChannelFactory.class); - + private static final int DEFAULT_WORKER_NUM = Runtime.getRuntime().availableProcessors() * 2; - private static ClientSocketChannelFactory factory; - private static AtomicInteger clientCount = new AtomicInteger(0); + private static final Object lockObjectForLoopGroup = new Object(); private static AtomicInteger serverCount = new AtomicInteger(0); + public enum ClientChannelId { + CLIENT_DEFAULT, + FETCHER + } + + private static final Map<ClientChannelId, Integer> defaultMaxKeyPoolCount = + new ConcurrentHashMap<ClientChannelId, Integer>(); + private static final Map<ClientChannelId, Queue<EventLoopGroup>> eventLoopGroupPool = + new ConcurrentHashMap<ClientChannelId, Queue<EventLoopGroup>>(); + private RpcChannelFactory(){ } + + static { + Runtime.getRuntime().addShutdownHook(new CleanUpHandler()); + + defaultMaxKeyPoolCount.put(ClientChannelId.CLIENT_DEFAULT, 1); + defaultMaxKeyPoolCount.put(ClientChannelId.FETCHER, 1); + } /** - * make this factory static thus all clients can share its thread pool. - * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe - */ - public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory() { - return getSharedClientChannelFactory(DEFAULT_WORKER_NUM); + * make this factory static thus all clients can share its thread pool. + * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe + */ + public static EventLoopGroup getSharedClientEventloopGroup() { + return getSharedClientEventloopGroup(DEFAULT_WORKER_NUM); + } + + /** + * make this factory static thus all clients can share its thread pool. + * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe + * + * @param workerNum The number of workers + */ + public static EventLoopGroup getSharedClientEventloopGroup(int workerNum){ + //shared woker and boss pool + return getSharedClientEventloopGroup(ClientChannelId.CLIENT_DEFAULT, workerNum); } /** - * make this factory static thus all clients can share its thread pool. - * NioClientSocketChannelFactory has only one method newChannel() visible for user, which is thread-safe + * This function return eventloopgroup by key. Fetcher client will have one or more eventloopgroup for its throughput. * - * @param workerNum The number of workers + * @param clientId + * @param workerNum + * @return */ - public static synchronized ClientSocketChannelFactory getSharedClientChannelFactory(int workerNum){ - //shared woker and boss pool - if(factory == null){ - factory = createClientChannelFactory("Internal-Client", workerNum); + public static EventLoopGroup getSharedClientEventloopGroup(ClientChannelId clientId, int workerNum) { + Queue<EventLoopGroup> eventLoopGroupQueue; + EventLoopGroup returnEventLoopGroup; + + synchronized (lockObjectForLoopGroup) { + eventLoopGroupQueue = eventLoopGroupPool.get(clientId); + if (eventLoopGroupQueue == null) { + eventLoopGroupQueue = createClientEventloopGroups(clientId, workerNum); + } + + returnEventLoopGroup = eventLoopGroupQueue.poll(); + if (isEventLoopGroupShuttingDown(returnEventLoopGroup)) { + returnEventLoopGroup = createClientEventloopGroup(clientId.name(), workerNum); + } + eventLoopGroupQueue.add(returnEventLoopGroup); } - return factory; + + return returnEventLoopGroup; + } + + protected static boolean isEventLoopGroupShuttingDown(EventLoopGroup eventLoopGroup) { + return ((eventLoopGroup == null) || eventLoopGroup.isShuttingDown()); } // Client must release the external resources - public static synchronized ClientSocketChannelFactory createClientChannelFactory(String name, int workerNum) { - name = name + "-" + clientCount.incrementAndGet(); - if(LOG.isDebugEnabled()){ - LOG.debug("Create " + name + " ClientSocketChannelFactory. Worker:" + workerNum); + protected static Queue<EventLoopGroup> createClientEventloopGroups(ClientChannelId clientId, int workerNum) { + int defaultMaxObjectCount = defaultMaxKeyPoolCount.get(clientId); + Queue<EventLoopGroup> loopGroupQueue = new ConcurrentLinkedQueue<EventLoopGroup>(); + eventLoopGroupPool.put(clientId, loopGroupQueue); + + for (int objectIdx = 0; objectIdx < defaultMaxObjectCount; objectIdx++) { + loopGroupQueue.add(createClientEventloopGroup(clientId.name(), workerNum)); } - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - ThreadFactory bossFactory = builder.setNameFormat(name + " Boss #%d").build(); - ThreadFactory workerFactory = builder.setNameFormat(name + " Worker #%d").build(); + return loopGroupQueue; + } - NioClientBossPool bossPool = new NioClientBossPool(Executors.newCachedThreadPool(bossFactory), 1, - new HashedWheelTimer(), ThreadNameDeterminer.CURRENT); - NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum, - ThreadNameDeterminer.CURRENT); + protected static EventLoopGroup createClientEventloopGroup(String name, int workerNum) { + if (LOG.isDebugEnabled()) { + LOG.debug("Create " + name + " ClientEventLoopGroup. Worker:" + workerNum); + } + + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + ThreadFactory clientFactory = builder.setNameFormat(name + " Client #%d").build(); - return new NioClientSocketChannelFactory(bossPool, workerPool); + return new NioEventLoopGroup(workerNum, clientFactory); } // Client must release the external resources - public static synchronized ServerSocketChannelFactory createServerChannelFactory(String name, int workerNum) { + public static ServerBootstrap createServerChannelFactory(String name, int workerNum) { name = name + "-" + serverCount.incrementAndGet(); if(LOG.isInfoEnabled()){ LOG.info("Create " + name + " ServerSocketChannelFactory. Worker:" + workerNum); @@ -93,22 +145,38 @@ public final class RpcChannelFactory { ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); ThreadFactory bossFactory = builder.setNameFormat(name + " Server Boss #%d").build(); ThreadFactory workerFactory = builder.setNameFormat(name + " Server Worker #%d").build(); - - NioServerBossPool bossPool = - new NioServerBossPool(Executors.newCachedThreadPool(bossFactory), 1, ThreadNameDeterminer.CURRENT); - NioWorkerPool workerPool = - new NioWorkerPool(Executors.newCachedThreadPool(workerFactory), workerNum, ThreadNameDeterminer.CURRENT); - - return new NioServerSocketChannelFactory(bossPool, workerPool); + + EventLoopGroup bossGroup = + new NioEventLoopGroup(1, bossFactory); + EventLoopGroup workerGroup = + new NioEventLoopGroup(workerNum, workerFactory); + + return new ServerBootstrap().group(bossGroup, workerGroup); } - public static synchronized void shutdown(){ + public static void shutdownGracefully(){ if(LOG.isDebugEnabled()) { LOG.debug("Shutdown Shared RPC Pool"); } - if (factory != null) { - factory.releaseExternalResources(); + + synchronized(lockObjectForLoopGroup) { + for (Queue<EventLoopGroup> eventLoopGroupQueue: eventLoopGroupPool.values()) { + for (EventLoopGroup eventLoopGroup: eventLoopGroupQueue) { + eventLoopGroup.shutdownGracefully(); + } + + eventLoopGroupQueue.clear(); + } + eventLoopGroupPool.clear(); + } + } + + static class CleanUpHandler extends Thread { + + @Override + public void run() { + RpcChannelFactory.shutdownGracefully(); } - factory = null; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java index c8e622b..4ad9771 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcConnectionPool.java @@ -21,79 +21,71 @@ package org.apache.tajo.rpc; import com.google.common.base.Objects; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.jboss.netty.channel.ConnectTimeoutException; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.logging.CommonsLoggerFactory; -import org.jboss.netty.logging.InternalLoggerFactory; +import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; +import io.netty.util.internal.logging.CommonsLoggerFactory; +import io.netty.util.internal.logging.InternalLoggerFactory; import java.net.InetSocketAddress; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; +import java.util.HashMap; +import java.util.Map; public class RpcConnectionPool { private static final Log LOG = LogFactory.getLog(RpcConnectionPool.class); - private ConcurrentMap<RpcConnectionKey, NettyClientBase> connections = - new ConcurrentHashMap<RpcConnectionKey, NettyClientBase>(); - private ChannelGroup accepted = new DefaultChannelGroup(); + private Map<RpcConnectionKey, NettyClientBase> connections = + new HashMap<RpcConnectionKey, NettyClientBase>(); + private ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static RpcConnectionPool instance; - private final ClientSocketChannelFactory channelFactory; + private final Object lockObject = new Object(); public final static int RPC_RETRIES = 3; - private RpcConnectionPool(ClientSocketChannelFactory channelFactory) { - this.channelFactory = channelFactory; + private RpcConnectionPool() { } public synchronized static RpcConnectionPool getPool() { if(instance == null) { InternalLoggerFactory.setDefaultFactory(new CommonsLoggerFactory()); - instance = new RpcConnectionPool(RpcChannelFactory.getSharedClientChannelFactory()); + instance = new RpcConnectionPool(); } return instance; } - public synchronized static RpcConnectionPool newPool(String poolName, int workerNum) { - return new RpcConnectionPool(RpcChannelFactory.createClientChannelFactory(poolName, workerNum)); - } - private NettyClientBase makeConnection(RpcConnectionKey rpcConnectionKey) throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { NettyClientBase client; if(rpcConnectionKey.asyncMode) { - client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES); + client = new AsyncRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, + RPC_RETRIES); } else { - client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, channelFactory, RPC_RETRIES); + client = new BlockingRpcClient(rpcConnectionKey.protocolClass, rpcConnectionKey.addr, + RPC_RETRIES); } accepted.add(client.getChannel()); return client; } public NettyClientBase getConnection(InetSocketAddress addr, - Class protocolClass, boolean asyncMode) + Class<?> protocolClass, boolean asyncMode) throws NoSuchMethodException, ClassNotFoundException, ConnectTimeoutException { RpcConnectionKey key = new RpcConnectionKey(addr, protocolClass, asyncMode); NettyClientBase client = connections.get(key); if (client == null) { - boolean added; - synchronized (connections){ - client = makeConnection(key); - connections.put(key, client); - added = true; - } - - if (!added) { - client.close(); + synchronized (lockObject){ client = connections.get(key); + if (client == null) { + client = makeConnection(key); + connections.put(key, client); + } } } - if (!client.getChannel().isOpen() || !client.getChannel().isConnected()) { + if (client.getChannel() == null || !client.getChannel().isOpen() || !client.getChannel().isActive()) { LOG.warn("Try to reconnect : " + addr); client.connect(addr); } @@ -104,9 +96,11 @@ public class RpcConnectionPool { if (client == null) return; try { - if (!client.getChannel().isOpen()) { - connections.remove(client.getKey()); - client.close(); + synchronized (lockObject) { + if (!client.getChannel().isOpen()) { + connections.remove(client.getKey()); + client.close(); + } } if(LOG.isDebugEnabled()) { @@ -128,8 +122,10 @@ public class RpcConnectionPool { LOG.debug("Close connection [" + client.getKey() + "]"); } - connections.remove(client.getKey()); - client.close(); + synchronized (lockObject) { + connections.remove(client.getKey()); + client.close(); + } } catch (Exception e) { LOG.error("Can't close connection:" + client.getKey() + ":" + e.getMessage(), e); @@ -140,7 +136,7 @@ public class RpcConnectionPool { if(LOG.isDebugEnabled()) { LOG.debug("Pool Closed"); } - synchronized(connections) { + synchronized(lockObject) { for(NettyClientBase eachClient: connections.values()) { try { eachClient.close(); @@ -148,11 +144,12 @@ public class RpcConnectionPool { LOG.error("close client pool error", e); } } + + connections.clear(); } - connections.clear(); try { - accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); + accepted.close(); } catch (Throwable t) { LOG.error(t); } @@ -160,18 +157,16 @@ public class RpcConnectionPool { public synchronized void shutdown(){ close(); - if(channelFactory != null){ - channelFactory.releaseExternalResources(); - } + RpcChannelFactory.shutdownGracefully(); } static class RpcConnectionKey { final InetSocketAddress addr; - final Class protocolClass; + final Class<?> protocolClass; final boolean asyncMode; public RpcConnectionKey(InetSocketAddress addr, - Class protocolClass, boolean asyncMode) { + Class<?> protocolClass, boolean asyncMode) { this.addr = addr; this.protocolClass = protocolClass; this.asyncMode = asyncMode; http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java index 140f781..fb1cec2 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ServerCallable.java @@ -18,30 +18,30 @@ package org.apache.tajo.rpc; -import com.google.protobuf.ServiceException; - import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import com.google.protobuf.ServiceException; + public abstract class ServerCallable<T> { protected InetSocketAddress addr; protected long startTime; protected long endTime; - protected Class protocol; + protected Class<?> protocol; protected boolean asyncMode; protected boolean closeConn; protected RpcConnectionPool connPool; public abstract T call(NettyClientBase client) throws Exception; - public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class protocol, boolean asyncMode) { + public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, boolean asyncMode) { this(connPool, addr, protocol, asyncMode, false); } - public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class protocol, + public ServerCallable(RpcConnectionPool connPool, InetSocketAddress addr, Class<?> protocol, boolean asyncMode, boolean closeConn) { this.connPool = connPool; this.addr = addr; http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java index 61a92bc..31d5265 100644 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestAsyncRpc.java @@ -27,13 +27,21 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage; import org.apache.tajo.rpc.test.TestProtos.SumRequest; import org.apache.tajo.rpc.test.TestProtos.SumResponse; import org.apache.tajo.rpc.test.impl.DummyProtocolAsyncImpl; -import org.jboss.netty.channel.ConnectTimeoutException; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExternalResource; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; +import io.netty.channel.ConnectTimeoutException; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -47,43 +55,102 @@ public class TestAsyncRpc { double sum; String echo; - static AsyncRpcServer server; - static AsyncRpcClient client; - static Interface stub; - static DummyProtocolAsyncImpl service; - ClientSocketChannelFactory clientChannelFactory; + AsyncRpcServer server; + AsyncRpcClient client; + Interface stub; + DummyProtocolAsyncImpl service; int retries; + + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @interface SetupRpcConnection { + boolean setupRpcServer() default true; + boolean setupRpcClient() default true; + } + + @Rule + public ExternalResource resource = new ExternalResource() { + + private Description description; + + @Override + public Statement apply(Statement base, Description description) { + this.description = description; + return super.apply(base, description); + } - @Before - public void setUp() throws Exception { - retries = 1; + @Override + protected void before() throws Throwable { + SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); + + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { + setUpRpcServer(); + } + + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { + setUpRpcClient(); + } + } + + @Override + protected void after() { + SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); - clientChannelFactory = RpcChannelFactory.createClientChannelFactory("TestAsyncRpc", 2); + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { + try { + tearDownRpcClient(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { + try { + tearDownRpcServer(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + } + + }; + + public void setUpRpcServer() throws Exception { service = new DummyProtocolAsyncImpl(); server = new AsyncRpcServer(DummyProtocol.class, service, new InetSocketAddress("127.0.0.1", 0), 2); server.start(); + } + + public void setUpRpcClient() throws Exception { + retries = 1; + client = new AsyncRpcClient(DummyProtocol.class, - RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries); + RpcUtils.getConnectAddress(server.getListenAddress()), retries); stub = client.getStub(); } - @After - public void tearDown() throws Exception { - if(client != null) { - client.close(); - } - + @AfterClass + public static void tearDownClass() throws Exception { + RpcChannelFactory.shutdownGracefully(); + } + + public void tearDownRpcServer() throws Exception { if(server != null) { server.shutdown(); + server = null; } - - if (clientChannelFactory != null) { - clientChannelFactory.releaseExternalResources(); + } + + public void tearDownRpcClient() throws Exception { + if(client != null) { + client.close(); + client = null; } } boolean calledMarker = false; + @Test public void testRpc() throws Exception { @@ -130,7 +197,7 @@ public class TestAsyncRpc { testNullLatch.countDown(); } }); - testNullLatch.await(1000, TimeUnit.MILLISECONDS); + assertTrue(testNullLatch.await(1000, TimeUnit.MILLISECONDS)); assertTrue(service.getNullCalled); } @@ -169,8 +236,7 @@ public class TestAsyncRpc { .setMessage(MESSAGE).build(); CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - server.shutdown(); - server = null; + tearDownRpcServer(); stub.echo(future.getController(), echoMessage, future); EchoMessage response = future.get(); @@ -187,8 +253,10 @@ public class TestAsyncRpc { .setMessage(MESSAGE).build(); CallFuture<EchoMessage> future = new CallFuture<EchoMessage>(); - server.shutdown(); - server = null; + if (server != null) { + server.shutdown(true); + server = null; + } stub = client.getStub(); stub.echo(future.getController(), echoMessage, future); @@ -200,10 +268,13 @@ public class TestAsyncRpc { } @Test + @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false) public void testConnectionRetry() throws Exception { retries = 10; - final InetSocketAddress address = server.getListenAddress(); - tearDown(); + ServerSocket serverSocket = new ServerSocket(0); + final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort()); + serverSocket.close(); + service = new DummyProtocolAsyncImpl(); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); @@ -214,7 +285,7 @@ public class TestAsyncRpc { @Override public void run() { try { - Thread.sleep(100); + Thread.sleep(1000); server = new AsyncRpcServer(DummyProtocol.class, service, address, 2); } catch (Exception e) { @@ -225,8 +296,7 @@ public class TestAsyncRpc { }); serverThread.start(); - clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2); - client = new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries); + client = new AsyncRpcClient(DummyProtocol.class, address, retries); stub = client.getStub(); stub.echo(future.getController(), echoMessage, future); @@ -240,7 +310,7 @@ public class TestAsyncRpc { InetSocketAddress address = new InetSocketAddress("test", 0); boolean expected = false; try { - new AsyncRpcClient(DummyProtocol.class, address, clientChannelFactory, retries); + new AsyncRpcClient(DummyProtocol.class, address, retries); fail(); } catch (ConnectTimeoutException e) { expected = true; @@ -251,13 +321,11 @@ public class TestAsyncRpc { } @Test + @SetupRpcConnection(setupRpcClient=false) public void testUnresolvedAddress() throws Exception { - client.close(); - client = null; - String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); client = new AsyncRpcClient(DummyProtocol.class, - RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries); + RpcUtils.createUnresolved(hostAndPort), retries); Interface stub = client.getStub(); EchoMessage echoMessage = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java index 746bfcb..07e2dca 100644 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/TestBlockingRpc.java @@ -24,13 +24,20 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage; import org.apache.tajo.rpc.test.TestProtos.SumRequest; import org.apache.tajo.rpc.test.TestProtos.SumResponse; import org.apache.tajo.rpc.test.impl.DummyProtocolBlockingImpl; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.Rule; import org.junit.Test; - +import org.junit.rules.ExternalResource; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.net.ServerSocket; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -44,35 +51,92 @@ public class TestBlockingRpc { private BlockingInterface stub; private DummyProtocolBlockingImpl service; private int retries; - private ClientSocketChannelFactory clientChannelFactory; - - @Before - public void setUp() throws Exception { - retries = 1; + + @Retention(RetentionPolicy.RUNTIME) + @Target(ElementType.METHOD) + @interface SetupRpcConnection { + boolean setupRpcServer() default true; + boolean setupRpcClient() default true; + } + + @Rule + public ExternalResource resource = new ExternalResource() { + + private Description description; + + @Override + public Statement apply(Statement base, Description description) { + this.description = description; + return super.apply(base, description); + } - clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2); + @Override + protected void before() throws Throwable { + SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); + + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { + setUpRpcServer(); + } + + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { + setUpRpcClient(); + } + } + @Override + protected void after() { + SetupRpcConnection setupRpcConnection = description.getAnnotation(SetupRpcConnection.class); + + if (setupRpcConnection == null || setupRpcConnection.setupRpcClient()) { + try { + tearDownRpcClient(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + if (setupRpcConnection == null || setupRpcConnection.setupRpcServer()) { + try { + tearDownRpcServer(); + } catch (Exception e) { + fail(e.getMessage()); + } + } + } + + }; + + public void setUpRpcServer() throws Exception { service = new DummyProtocolBlockingImpl(); server = new BlockingRpcServer(DummyProtocol.class, service, new InetSocketAddress("127.0.0.1", 0), 2); server.start(); + } + + public void setUpRpcClient() throws Exception { + retries = 1; + client = new BlockingRpcClient(DummyProtocol.class, - RpcUtils.getConnectAddress(server.getListenAddress()), clientChannelFactory, retries); + RpcUtils.getConnectAddress(server.getListenAddress()), retries); stub = client.getStub(); } - @After - public void tearDown() throws Exception { - if(client != null) { - client.close(); - } - + @AfterClass + public static void tearDownClass() throws Exception { + RpcChannelFactory.shutdownGracefully(); + } + + public void tearDownRpcServer() throws Exception { if(server != null) { server.shutdown(); + server = null; } - - if(clientChannelFactory != null){ - clientChannelFactory.releaseExternalResources(); + } + + public void tearDownRpcClient() throws Exception { + if(client != null) { + client.close(); + client = null; } } @@ -93,8 +157,9 @@ public class TestBlockingRpc { } @Test + @SetupRpcConnection(setupRpcClient=false) public void testRpcWithServiceCallable() throws Exception { - RpcConnectionPool pool = RpcConnectionPool.newPool(getClass().getSimpleName(), 2); + RpcConnectionPool pool = RpcConnectionPool.getPool(); final SumRequest request = SumRequest.newBuilder() .setX1(1) .setX2(2) @@ -148,10 +213,12 @@ public class TestBlockingRpc { } @Test + @SetupRpcConnection(setupRpcServer=false,setupRpcClient=false) public void testConnectionRetry() throws Exception { retries = 10; - final InetSocketAddress address = server.getListenAddress(); - tearDown(); + ServerSocket serverSocket = new ServerSocket(0); + final InetSocketAddress address = new InetSocketAddress("127.0.0.1", serverSocket.getLocalPort()); + serverSocket.close(); EchoMessage message = EchoMessage.newBuilder() .setMessage(MESSAGE).build(); @@ -161,8 +228,8 @@ public class TestBlockingRpc { @Override public void run() { try { - Thread.sleep(100); - server = new BlockingRpcServer(DummyProtocol.class, service, address, 2); + Thread.sleep(1000); + server = new BlockingRpcServer(DummyProtocol.class, new DummyProtocolBlockingImpl(), address, 2); } catch (Exception e) { fail(e.getMessage()); } @@ -171,8 +238,7 @@ public class TestBlockingRpc { }); serverThread.start(); - clientChannelFactory = RpcChannelFactory.createClientChannelFactory(MESSAGE, 2); - client = new BlockingRpcClient(DummyProtocol.class, address, clientChannelFactory, retries); + client = new BlockingRpcClient(DummyProtocol.class, address, retries); stub = client.getStub(); EchoMessage response = stub.echo(null, message); @@ -182,14 +248,20 @@ public class TestBlockingRpc { @Test public void testConnectionFailed() throws Exception { boolean expected = false; + NettyClientBase client = null; + try { int port = server.getListenAddress().getPort() + 1; - new BlockingRpcClient(DummyProtocol.class, - RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), clientChannelFactory, retries); + client = new BlockingRpcClient(DummyProtocol.class, + RpcUtils.getConnectAddress(new InetSocketAddress("127.0.0.1", port)), retries); + client.close(); fail("Connection should be failed."); } catch (ConnectException ce) { expected = true; } catch (Throwable ce){ + if (client != null) { + client.close(); + } fail(); } assertTrue(expected); @@ -240,7 +312,7 @@ public class TestBlockingRpc { }; shutdownThread.start(); - latch.await(5 * 1000, TimeUnit.MILLISECONDS); + assertTrue(latch.await(5 * 1000, TimeUnit.MILLISECONDS)); assertTrue(latch.getCount() == 0); @@ -254,13 +326,11 @@ public class TestBlockingRpc { } @Test + @SetupRpcConnection(setupRpcClient=false) public void testUnresolvedAddress() throws Exception { - client.close(); - client = null; - String hostAndPort = RpcUtils.normalizeInetSocketAddress(server.getListenAddress()); client = new BlockingRpcClient(DummyProtocol.class, - RpcUtils.createUnresolved(hostAndPort), clientChannelFactory, retries); + RpcUtils.createUnresolved(hostAndPort), retries); BlockingInterface stub = client.getStub(); EchoMessage message = EchoMessage.newBuilder() http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java index 90499ce..0ca7563 100644 --- a/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java +++ b/tajo-rpc/src/test/java/org/apache/tajo/rpc/test/impl/DummyProtocolAsyncImpl.java @@ -27,7 +27,6 @@ import org.apache.tajo.rpc.test.TestProtos.EchoMessage; import org.apache.tajo.rpc.test.TestProtos.SumRequest; import org.apache.tajo.rpc.test.TestProtos.SumResponse; -@SuppressWarnings("UnusedDeclaration") public class DummyProtocolAsyncImpl implements Interface { private static final Log LOG = LogFactory.getLog(DummyProtocolAsyncImpl.class); @@ -74,7 +73,7 @@ public class DummyProtocolAsyncImpl implements Interface { try { Thread.sleep(3000); } catch (InterruptedException e) { - e.printStackTrace(); + LOG.error(e.getMessage()); } done.run(request); http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml index 5513aa6..957b4c1 100644 --- a/tajo-storage/tajo-storage-hdfs/pom.xml +++ b/tajo-storage/tajo-storage-hdfs/pom.xml @@ -168,6 +168,18 @@ limitations under the License. <dependencies> <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + </dependency> + <dependency> <groupId>org.apache.tajo</groupId> <artifactId>tajo-common</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java index cf8a54e..389cd31 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServer.java @@ -21,13 +21,16 @@ package org.apache.tajo; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.net.NetUtils; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.ChannelGroupFuture; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.GlobalEventExecutor; import java.net.InetSocketAddress; import java.util.concurrent.Executors; @@ -38,20 +41,20 @@ public class HttpFileServer { private final InetSocketAddress addr; private InetSocketAddress bindAddr; private ServerBootstrap bootstrap = null; - private ChannelFactory factory = null; + private EventLoopGroup eventloopGroup = null; private ChannelGroup channelGroup = null; public HttpFileServer(final InetSocketAddress addr) { this.addr = addr; - this.factory = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), - 2); + this.eventloopGroup = new NioEventLoopGroup(2, Executors.defaultThreadFactory()); // Configure the server. - this.bootstrap = new ServerBootstrap(factory); - // Set up the event pipeline factory. - this.bootstrap.setPipelineFactory(new HttpFileServerPipelineFactory()); - this.channelGroup = new DefaultChannelGroup(); + this.bootstrap = new ServerBootstrap(); + this.bootstrap.childHandler(new HttpFileServerChannelInitializer()) + .group(eventloopGroup) + .option(ChannelOption.TCP_NODELAY, true) + .channel(NioServerSocketChannel.class); + this.channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } public HttpFileServer(String bindaddr) { @@ -60,9 +63,9 @@ public class HttpFileServer { public void start() { // Bind and start to accept incoming connections. - Channel channel = bootstrap.bind(addr); - channelGroup.add(channel); - this.bindAddr = (InetSocketAddress) channel.getLocalAddress(); + ChannelFuture future = bootstrap.bind(addr).syncUninterruptibly(); + channelGroup.add(future.channel()); + this.bindAddr = (InetSocketAddress) future.channel().localAddress(); LOG.info("HttpFileServer starts up (" + this.bindAddr.getAddress().getHostAddress() + ":" + this.bindAddr.getPort() + ")"); @@ -73,9 +76,8 @@ public class HttpFileServer { } public void stop() { - ChannelGroupFuture future = channelGroup.close(); - future.awaitUninterruptibly(); - factory.releaseExternalResources(); + channelGroup.close(); + eventloopGroup.shutdownGracefully(); LOG.info("HttpFileServer shutdown (" + this.bindAddr.getAddress().getHostAddress() + ":" http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java new file mode 100644 index 0000000..f2a97b6 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerChannelInitializer.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.stream.ChunkedWriteHandler; + +public class HttpFileServerChannelInitializer extends ChannelInitializer<Channel> { + + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + + // Uncomment the following lines if you want HTTPS + //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); + //engine.setUseClientMode(false); + //pipeline.addLast("ssl", new SslHandler(engine)); + + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("decoder", new HttpRequestDecoder()); + pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); + pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); + + pipeline.addLast("handler", new HttpFileServerHandler()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java index 6c77317..78902f3 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerHandler.java @@ -18,16 +18,13 @@ package org.apache.tajo; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.*; -import org.jboss.netty.handler.codec.frame.TooLongFrameException; -import org.jboss.netty.handler.codec.http.DefaultHttpResponse; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; -import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.handler.stream.ChunkedFile; -import org.jboss.netty.util.CharsetUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.*; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.netty.util.CharsetUtil; import java.io.File; import java.io.FileNotFoundException; @@ -35,39 +32,34 @@ import java.io.RandomAccessFile; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; -import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; -import static org.jboss.netty.handler.codec.http.HttpMethod.GET; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; -import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; -/** - * this is an implementation copied from HttpStaticFileServerHandler.java of netty 3.6 - */ -public class HttpFileServerHandler extends SimpleChannelUpstreamHandler { +public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { + + private final Log LOG = LogFactory.getLog(HttpFileServerHandler.class); @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - HttpRequest request = (HttpRequest) e.getMessage(); - if (request.getMethod() != GET) { - sendError(ctx, METHOD_NOT_ALLOWED); + public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { + + if (request.getMethod() != HttpMethod.GET) { + sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); return; } final String path = sanitizeUri(request.getUri()); if (path == null) { - sendError(ctx, FORBIDDEN); + sendError(ctx, HttpResponseStatus.FORBIDDEN); return; } File file = new File(path); if (file.isHidden() || !file.exists()) { - sendError(ctx, NOT_FOUND); + sendError(ctx, HttpResponseStatus.NOT_FOUND); return; } if (!file.isFile()) { - sendError(ctx, FORBIDDEN); + sendError(ctx, HttpResponseStatus.FORBIDDEN); return; } @@ -75,62 +67,62 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler { try { raf = new RandomAccessFile(file, "r"); } catch (FileNotFoundException fnfe) { - sendError(ctx, NOT_FOUND); + sendError(ctx, HttpResponseStatus.NOT_FOUND); return; } long fileLength = raf.length(); - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - setContentLength(response, fileLength); + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + HttpHeaders.setContentLength(response, fileLength); setContentTypeHeader(response); - Channel ch = e.getChannel(); - // Write the initial line and the header. - ch.write(response); + ctx.write(response); // Write the content. ChannelFuture writeFuture; - if (ch.getPipeline().get(SslHandler.class) != null) { + ChannelFuture lastContentFuture; + if (ctx.pipeline().get(SslHandler.class) != null) { // Cannot use zero-copy with HTTPS. - writeFuture = ch.write(new ChunkedFile(raf, 0, fileLength, 8192)); + lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192))); } else { // No encryption - use zero-copy. - final FileRegion region = - new DefaultFileRegion(raf.getChannel(), 0, fileLength); - writeFuture = ch.write(region); - writeFuture.addListener(new ChannelFutureProgressListener() { - public void operationComplete(ChannelFuture future) { - region.releaseExternalResources(); + final FileRegion region = new DefaultFileRegion(raf.getChannel(), 0, fileLength); + writeFuture = ctx.write(region); + lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + writeFuture.addListener(new ChannelProgressiveFutureListener() { + @Override + public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) + throws Exception { + LOG.trace(String.format("%s: %d / %d", path, progress, total)); } - public void operationProgressed( - ChannelFuture future, long amount, long current, long total) { - System.out.printf("%s: %d / %d (+%d)%n", path, current, total, amount); + @Override + public void operationComplete(ChannelProgressiveFuture future) throws Exception { + region.release(); } }); } // Decide whether to close the connection or not. - if (!isKeepAlive(request)) { + if (!HttpHeaders.isKeepAlive(request)) { // Close the connection when the whole content is written out. - writeFuture.addListener(ChannelFutureListener.CLOSE); + lastContentFuture.addListener(ChannelFutureListener.CLOSE); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - Channel ch = e.getChannel(); - Throwable cause = e.getCause(); + Channel ch = ctx.channel(); if (cause instanceof TooLongFrameException) { - sendError(ctx, BAD_REQUEST); + sendError(ctx, HttpResponseStatus.BAD_REQUEST); return; } - cause.printStackTrace(); - if (ch.isConnected()) { - sendError(ctx, INTERNAL_SERVER_ERROR); + LOG.error(cause.getMessage(), cause); + if (ch.isActive()) { + sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); } } @@ -161,14 +153,13 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler { } private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - response.setContent(ChannelBuffers.copiedBuffer( - "Failure: " + status.toString() + "\r\n", + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, + Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); // Close the connection as soon as the error message is sent. - ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } /** @@ -178,7 +169,7 @@ public class HttpFileServerHandler extends SimpleChannelUpstreamHandler { * HTTP response */ private static void setContentTypeHeader(HttpResponse response) { - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java deleted file mode 100644 index cecf93b..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/HttpFileServerPipelineFactory.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.handler.codec.http.HttpChunkAggregator; -import org.jboss.netty.handler.codec.http.HttpRequestDecoder; -import org.jboss.netty.handler.codec.http.HttpResponseEncoder; -import org.jboss.netty.handler.stream.ChunkedWriteHandler; - -import static org.jboss.netty.channel.Channels.pipeline; - -// Uncomment the following lines if you want HTTPS -//import javax.net.ssl.SSLEngine; -//import org.jboss.netty.example.securechat.SecureChatSslContextFactory; -//import org.jboss.netty.handler.ssl.SslHandler; - -//this class is copied from HttpStaticFileServerPipelineFactory.java of netty 3.6 -public class HttpFileServerPipelineFactory implements ChannelPipelineFactory { - public ChannelPipeline getPipeline() throws Exception { - // Create a default pipeline implementation. - ChannelPipeline pipeline = pipeline(); - - // Uncomment the following lines if you want HTTPS - //SSLEngine engine = SecureChatSslContextFactory.getServerContext().createSSLEngine(); - //engine.setUseClientMode(false); - //pipeline.addLast("ssl", new SslHandler(engine)); - - pipeline.addLast("decoder", new HttpRequestDecoder()); - pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); - pipeline.addLast("encoder", new HttpResponseEncoder()); - pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); - - pipeline.addLast("handler", new HttpFileServerHandler()); - return pipeline; - } -} \ No newline at end of file
