TAJO-527: Upgrade to Netty 4 Closes #311
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/22876a82 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/22876a82 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/22876a82 Branch: refs/heads/index_support Commit: 22876a825e9d19b0f599c342d4ae3902d85f2c4d Parents: 64e47a4 Author: Jihun Kang <[email protected]> Authored: Tue Mar 3 22:10:21 2015 +0900 Committer: Jihun Kang <[email protected]> Committed: Tue Mar 3 22:10:21 2015 +0900 ---------------------------------------------------------------------- CHANGES | 1 + .../org/apache/tajo/client/QueryClientImpl.java | 3 +- .../apache/tajo/client/SessionConnection.java | 15 +- tajo-core/pom.xml | 4 + .../java/org/apache/tajo/master/TajoMaster.java | 2 +- .../tajo/worker/ExecutionBlockContext.java | 37 +-- .../java/org/apache/tajo/worker/Fetcher.java | 198 +++++++------- .../java/org/apache/tajo/worker/TajoWorker.java | 2 +- .../main/java/org/apache/tajo/worker/Task.java | 12 +- .../java/org/apache/tajo/worker/TaskRunner.java | 4 +- .../apache/tajo/worker/TaskRunnerManager.java | 13 +- .../apache/tajo/master/TestRepartitioner.java | 5 +- .../org/apache/tajo/worker/TestFetcher.java | 25 +- tajo-project/pom.xml | 24 +- tajo-pullserver/pom.xml | 8 + .../tajo/pullserver/FadvisedChunkedFile.java | 17 +- .../tajo/pullserver/FadvisedFileRegion.java | 16 +- .../tajo/pullserver/FileCloseListener.java | 8 +- .../HttpDataServerChannelInitializer.java | 58 +++++ .../tajo/pullserver/HttpDataServerHandler.java | 137 +++++----- .../HttpDataServerPipelineFactory.java | 56 ---- .../tajo/pullserver/PullServerAuxService.java | 229 ++++++++-------- .../tajo/pullserver/TajoPullServerService.java | 259 ++++++++++--------- .../retriever/AdvancedDataRetriever.java | 10 +- .../pullserver/retriever/DataRetriever.java | 4 +- .../retriever/DirectoryRetriever.java | 5 +- tajo-rpc/pom.xml | 10 +- .../org/apache/tajo/rpc/AsyncRpcClient.java | 106 +++++--- .../org/apache/tajo/rpc/AsyncRpcServer.java | 126 ++++----- .../org/apache/tajo/rpc/BlockingRpcClient.java | 122 +++++---- .../org/apache/tajo/rpc/BlockingRpcServer.java | 125 +++++---- .../java/org/apache/tajo/rpc/CallFuture.java | 8 +- .../apache/tajo/rpc/DefaultRpcController.java | 7 +- .../org/apache/tajo/rpc/NettyClientBase.java | 133 ++++++---- .../org/apache/tajo/rpc/NettyServerBase.java | 82 +++--- .../java/org/apache/tajo/rpc/NullCallback.java | 2 +- .../tajo/rpc/ProtoChannelInitializer.java | 50 ++++ .../apache/tajo/rpc/ProtoPipelineFactory.java | 50 ---- .../org/apache/tajo/rpc/RpcChannelFactory.java | 160 ++++++++---- .../org/apache/tajo/rpc/RpcConnectionPool.java | 87 +++---- .../org/apache/tajo/rpc/ServerCallable.java | 10 +- .../java/org/apache/tajo/rpc/TestAsyncRpc.java | 144 ++++++++--- .../org/apache/tajo/rpc/TestBlockingRpc.java | 138 +++++++--- .../rpc/test/impl/DummyProtocolAsyncImpl.java | 3 +- tajo-storage/tajo-storage-hdfs/pom.xml | 12 + .../java/org/apache/tajo/HttpFileServer.java | 44 ++-- .../tajo/HttpFileServerChannelInitializer.java | 47 ++++ .../org/apache/tajo/HttpFileServerHandler.java | 109 ++++---- .../tajo/HttpFileServerPipelineFactory.java | 54 ---- 49 files changed, 1552 insertions(+), 1229 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e8c8b18..668c0db 100644 --- a/CHANGES +++ b/CHANGES @@ -7,6 +7,7 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-527: Upgrade to Netty 4. (jihun) BUG FIXES http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index bc89679..fae613a 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -19,6 +19,7 @@ package org.apache.tajo.client; import com.google.protobuf.ServiceException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.*; @@ -32,6 +33,7 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol; import org.apache.tajo.jdbc.FetchResultSet; import org.apache.tajo.jdbc.TajoMemoryResultSet; import org.apache.tajo.rpc.NettyClientBase; +import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.util.ProtoUtil; @@ -83,7 +85,6 @@ public class QueryClientImpl implements QueryClient { @Override public void close() { - } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index f8762da..bcf6d8b 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -34,7 +34,8 @@ import org.apache.tajo.rpc.ServerCallable; import org.apache.tajo.service.ServiceTracker; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.ProtoUtil; -import org.jboss.netty.channel.ConnectTimeoutException; + +import io.netty.channel.ConnectTimeoutException; import java.io.Closeable; import java.io.IOException; @@ -84,11 +85,7 @@ public class SessionConnection implements Closeable { this.properties = properties; - //TODO separate ConfVars from TajoConf - int workerNum = this.properties.getInt("tajo.rpc.client.worker-thread-num", 4); - - // Don't share connection pool per client - connPool = RpcConnectionPool.newPool(getClass().getSimpleName(), workerNum); + connPool = RpcConnectionPool.getPool(); userInfo = UserRoleInfo.getCurrentUser(); this.baseDatabase = baseDatabase != null ? baseDatabase : null; @@ -130,7 +127,7 @@ public class SessionConnection implements Closeable { if(!closed.get()){ try { return connPool.getConnection(serviceTracker.getClientServiceAddress(), - TajoMasterClientProtocol.class, false).isConnected(); + TajoMasterClientProtocol.class, false).isActive(); } catch (Throwable e) { return false; } @@ -288,10 +285,6 @@ public class SessionConnection implements Closeable { } catch (Throwable e) { } - - if(connPool != null) { - connPool.shutdown(); - } } protected InetSocketAddress getTajoMasterAddr() { http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index ce9db73..d3c7ed6 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -388,6 +388,10 @@ <version>3.1.1</version> </dependency> <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + </dependency> + <dependency> <groupId>org.mortbay.jetty</groupId> <artifactId>jetty</artifactId> <version>6.1.14</version> http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index 586abb0..6f7c5a9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -559,7 +559,7 @@ public class TajoMaster extends CompositeService { LOG.info("TajoMaster received SIGINT Signal"); LOG.info("============================================"); stop(); - RpcChannelFactory.shutdown(); + RpcChannelFactory.shutdownGracefully(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 8cf94eb..813c502 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -42,9 +42,10 @@ import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.Pair; -import org.jboss.netty.channel.ConnectTimeoutException; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.util.Timer; +import org.apache.tajo.worker.event.TaskRunnerStartEvent; + +import io.netty.channel.ConnectTimeoutException; +import io.netty.channel.EventLoopGroup; import java.io.IOException; import java.net.InetSocketAddress; @@ -67,7 +68,7 @@ public class ExecutionBlockContext { public AtomicInteger killedTasksNum = new AtomicInteger(); public AtomicInteger failedTasksNum = new AtomicInteger(); - private ClientSocketChannelFactory channelFactory; + private EventLoopGroup loopGroup; // for temporal or intermediate files private FileSystem localFS; // for input files @@ -184,12 +185,6 @@ public class ExecutionBlockContext { tasks.clear(); resource.release(); - - try { - releaseShuffleChannelFactory(); - } catch (Throwable e) { - LOG.error(e.getMessage(), e); - } } public TajoConf getConf() { @@ -267,30 +262,10 @@ public class ExecutionBlockContext { return histories.get(runner.getId()); } - public TajoWorker.WorkerContext getWorkerContext() { + public TajoWorker.WorkerContext getWorkerContext(){ return workerContext; } - protected ClientSocketChannelFactory getShuffleChannelFactory(){ - if(channelFactory == null) { - int workerNum = getConf().getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM); - channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", workerNum); - } - return channelFactory; - } - - public Timer getRPCTimer() { - return manager.getRPCTimer(); - } - - protected void releaseShuffleChannelFactory(){ - if(channelFactory != null) { - channelFactory.shutdown(); - channelFactory.releaseExternalResources(); - channelFactory = null; - } - } - private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception { getQueryMasterStub().doneExecutionBlock(null, reporter, NullCallback.get()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index 742a025..fc57a96 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -18,20 +18,33 @@ package org.apache.tajo.worker; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.pullserver.retriever.FileChunk; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.*; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.handler.codec.http.*; -import org.jboss.netty.handler.timeout.ReadTimeoutException; -import org.jboss.netty.handler.timeout.ReadTimeoutHandler; -import org.jboss.netty.util.Timer; +import org.apache.tajo.rpc.RpcChannelFactory; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.DefaultHttpRequest; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpContent; +import io.netty.handler.codec.http.HttpContentDecompressor; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.ReferenceCountUtil; import java.io.File; import java.io.FileNotFoundException; @@ -40,8 +53,7 @@ import java.io.RandomAccessFile; import java.net.InetSocketAddress; import java.net.URI; import java.nio.channels.FileChannel; - -import static org.jboss.netty.channel.Channels.pipeline; +import java.util.concurrent.TimeUnit; /** * Fetcher fetches data from a given uri via HTTP protocol and stores them into @@ -64,17 +76,15 @@ public class Fetcher { private long fileLen; private int messageReceiveCount; private TajoProtos.FetcherState state; - private Timer timer; - private ClientBootstrap bootstrap; + private Bootstrap bootstrap; - public Fetcher(TajoConf conf, URI uri, FileChunk chunk, ClientSocketChannelFactory factory, Timer timer) { + public Fetcher(TajoConf conf, URI uri, FileChunk chunk) { this.uri = uri; this.fileChunk = chunk; this.useLocalFile = !chunk.fromRemote(); this.state = TajoProtos.FetcherState.FETCH_INIT; this.conf = conf; - this.timer = timer; String scheme = uri.getScheme() == null ? "http" : uri.getScheme(); this.host = uri.getHost() == null ? "localhost" : uri.getHost(); @@ -88,13 +98,18 @@ public class Fetcher { } if (!useLocalFile) { - bootstrap = new ClientBootstrap(factory); - bootstrap.setOption("connectTimeoutMillis", 5000L); // set 5 sec - bootstrap.setOption("receiveBufferSize", 1048576); // set 1M - bootstrap.setOption("tcpNoDelay", true); - - ChannelPipelineFactory pipelineFactory = new HttpClientPipelineFactory(fileChunk.getFile()); - bootstrap.setPipelineFactory(pipelineFactory); + bootstrap = new Bootstrap() + .group( + RpcChannelFactory.getSharedClientEventloopGroup(RpcChannelFactory.ClientChannelId.FETCHER, + conf.getIntVar(TajoConf.ConfVars.SHUFFLE_RPC_CLIENT_WORKER_THREAD_NUM))) + .channel(NioSocketChannel.class) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) // set 5 sec + .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M + .option(ChannelOption.TCP_NODELAY, true); + + ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile()); + bootstrap.handler(initializer); } } @@ -132,30 +147,30 @@ public class Fetcher { this.state = TajoProtos.FetcherState.FETCH_FETCHING; ChannelFuture future = null; try { - future = bootstrap.connect(new InetSocketAddress(host, port)); + future = bootstrap.clone().connect(new InetSocketAddress(host, port)) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); // Wait until the connection attempt succeeds or fails. - Channel channel = future.awaitUninterruptibly().getChannel(); + Channel channel = future.awaitUninterruptibly().channel(); if (!future.isSuccess()) { - future.getChannel().close(); state = TajoProtos.FetcherState.FETCH_FAILED; - throw new IOException(future.getCause()); + throw new IOException(future.cause()); } String query = uri.getPath() + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : ""); // Prepare the HTTP request. HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query); - request.setHeader(HttpHeaders.Names.HOST, host); - request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); - request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); + request.headers().set(HttpHeaders.Names.HOST, host); + request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); LOG.info("Status: " + getState() + ", URI:" + uri); // Send the HTTP request. - ChannelFuture channelFuture = channel.write(request); + ChannelFuture channelFuture = channel.writeAndFlush(request); // Wait for the server to close the connection. - channel.getCloseFuture().awaitUninterruptibly(); + channel.closeFuture().awaitUninterruptibly(); channelFuture.addListener(ChannelFutureListener.CLOSE); @@ -164,7 +179,7 @@ public class Fetcher { } finally { if(future != null){ // Close the channel to exit. - future.getChannel().close(); + future.channel().close(); } this.finishTime = System.currentTimeMillis(); @@ -176,8 +191,7 @@ public class Fetcher { return this.uri; } - class HttpClientHandler extends SimpleChannelUpstreamHandler { - private volatile boolean readingChunks; + class HttpClientHandler extends ChannelInboundHandlerAdapter { private final File file; private RandomAccessFile raf; private FileChannel fc; @@ -185,27 +199,27 @@ public class Fetcher { public HttpClientHandler(File file) throws FileNotFoundException { this.file = file; + this.raf = new RandomAccessFile(file, "rw"); + this.fc = raf.getChannel(); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { messageReceiveCount++; - try { - if (!readingChunks && e.getMessage() instanceof HttpResponse) { - - HttpResponse response = (HttpResponse) e.getMessage(); + if (msg instanceof HttpResponse) { + try { + HttpResponse response = (HttpResponse) msg; StringBuilder sb = new StringBuilder(); if (LOG.isDebugEnabled()) { - sb.append("STATUS: ").append(response.getStatus()) - .append(", VERSION: ").append(response.getProtocolVersion()) - .append(", HEADER: "); + sb.append("STATUS: ").append(response.getStatus()).append(", VERSION: ") + .append(response.getProtocolVersion()).append(", HEADER: "); } - if (!response.getHeaderNames().isEmpty()) { - for (String name : response.getHeaderNames()) { - for (String value : response.getHeaders(name)) { + if (!response.headers().names().isEmpty()) { + for (String name : response.headers().names()) { + for (String value : response.headers().getAll(name)) { if (LOG.isDebugEnabled()) { sb.append(name).append(" = ").append(value); } @@ -219,109 +233,99 @@ public class Fetcher { LOG.debug(sb.toString()); } - if (response.getStatus().getCode() == HttpResponseStatus.NO_CONTENT.getCode()) { + if (response.getStatus().code() == HttpResponseStatus.NO_CONTENT.code()) { LOG.warn("There are no data corresponding to the request"); length = 0; return; - } else if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()){ - LOG.error(response.getStatus().getReasonPhrase()); + } else if (response.getStatus().code() != HttpResponseStatus.OK.code()) { + LOG.error(response.getStatus().reasonPhrase()); state = TajoProtos.FetcherState.FETCH_FAILED; return; } + } catch (Exception e) { + LOG.error(e.getMessage()); + } finally { + ReferenceCountUtil.release(msg); + } + } - this.raf = new RandomAccessFile(file, "rw"); - this.fc = raf.getChannel(); + if (msg instanceof HttpContent) { + try { + HttpContent httpContent = (HttpContent) msg; + ByteBuf content = httpContent.content(); + if (content.isReadable()) { + content.readBytes(fc, content.readableBytes()); + } - if (response.isChunked()) { - readingChunks = true; - } else { - ChannelBuffer content = response.getContent(); - if (content.readable()) { - fc.write(content.toByteBuffer()); + if (msg instanceof LastHttpContent) { + if (raf != null) { + fileLen = file.length(); } - } - } else { - HttpChunk chunk = (HttpChunk) e.getMessage(); - if (chunk.isLast()) { - readingChunks = false; - long fileLength = file.length(); - if (fileLength == length) { - LOG.info("Data fetch is done (total received bytes: " + fileLength - + ")"); - } else { - LOG.info("Data fetch is done, but cannot get all data " - + "(received/total: " + fileLength + "/" + length + ")"); + + IOUtils.cleanup(LOG, fc, raf); + if (ctx.channel().isActive()) { + ctx.channel().close(); } - } else { - if(fc != null){ - fc.write(chunk.getContent().toByteBuffer()); + finishTime = System.currentTimeMillis(); + if (state != TajoProtos.FetcherState.FETCH_FAILED) { + state = TajoProtos.FetcherState.FETCH_FINISHED; } } - } - } finally { - if(raf != null) { - fileLen = file.length(); - } - - if(fileLen == length){ - IOUtils.cleanup(LOG, fc, raf); - finishTime = System.currentTimeMillis(); - state = TajoProtos.FetcherState.FETCH_FINISHED; + } catch (Exception e) { + LOG.error(e.getMessage()); + } finally { + ReferenceCountUtil.release(msg); } } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - if (e.getCause() instanceof ReadTimeoutException) { - LOG.warn(e.getCause()); + if (cause instanceof ReadTimeoutException) { + LOG.warn(cause); } else { - LOG.error("Fetch failed :", e.getCause()); + LOG.error("Fetch failed :", cause); } // this fetching will be retry IOUtils.cleanup(LOG, fc, raf); - if(ctx.getChannel().isConnected()){ - ctx.getChannel().close(); - } finishTime = System.currentTimeMillis(); state = TajoProtos.FetcherState.FETCH_FAILED; + ctx.close(); } @Override - public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - super.channelDisconnected(ctx, e); - + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){ //channel is closed, but cannot complete fetcher finishTime = System.currentTimeMillis(); state = TajoProtos.FetcherState.FETCH_FAILED; } IOUtils.cleanup(LOG, fc, raf); + + super.channelUnregistered(ctx); } } - class HttpClientPipelineFactory implements - ChannelPipelineFactory { + class HttpClientChannelInitializer extends ChannelInitializer<Channel> { private final File file; - public HttpClientPipelineFactory(File file) { + public HttpClientChannelInitializer(File file) { this.file = file; } @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = pipeline(); + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE); int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT); pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize)); pipeline.addLast("inflater", new HttpContentDecompressor()); - pipeline.addLast("timeout", new ReadTimeoutHandler(timer, readTimeout)); + pipeline.addLast("timeout", new ReadTimeoutHandler(readTimeout, TimeUnit.SECONDS)); pipeline.addLast("handler", new HttpClientHandler(file)); - return pipeline; } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 7e2a233..3c55add 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -604,7 +604,7 @@ public class TajoWorker extends CompositeService { LOG.info("TajoWorker received SIGINT Signal"); LOG.info("============================================"); stop(); - RpcChannelFactory.shutdown(); + RpcChannelFactory.shutdownGracefully(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index df3be12..ef94337 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import io.netty.channel.EventLoopGroup; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -55,9 +56,8 @@ import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.NetUtils; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.handler.codec.http.QueryStringDecoder; -import org.jboss.netty.util.Timer; + +import io.netty.handler.codec.http.QueryStringDecoder; import java.io.File; import java.io.IOException; @@ -664,8 +664,6 @@ public class Task { List<FetchImpl> fetches) throws IOException { if (fetches.size() > 0) { - ClientSocketChannelFactory channelFactory = executionBlockContext.getShuffleChannelFactory(); - Timer timer = executionBlockContext.getRPCTimer(); Path inputDir = executionBlockContext.getLocalDirAllocator(). getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); @@ -716,7 +714,7 @@ public class Task { // If we decide that intermediate data should be really fetched from a remote host, storeChunk // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it storeChunk.setEbId(f.getName()); - Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk, channelFactory, timer); + Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); runnerList.add(fetcher); i++; @@ -732,7 +730,7 @@ public class Task { private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { // Parse the URI LOG.info("getLocalStoredFileChunk starts"); - final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).getParameters(); + final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters(); final List<String> types = params.get("type"); final List<String> qids = params.get("qid"); final List<String> taskIdList = params.get("ta"); http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index cf50767..2cdebc8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -19,6 +19,7 @@ package org.apache.tajo.worker; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -35,7 +36,8 @@ import org.apache.tajo.master.container.TajoContainerIdPBImpl; import org.apache.tajo.master.container.TajoConverterUtils; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NullCallback; -import org.jboss.netty.channel.ConnectTimeoutException; + +import io.netty.channel.ConnectTimeoutException; import java.util.concurrent.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index 570bd38..3f4a1b8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -20,6 +20,7 @@ package org.apache.tajo.worker; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -33,8 +34,6 @@ import org.apache.tajo.engine.utils.TupleCache; import org.apache.tajo.worker.event.TaskRunnerEvent; import org.apache.tajo.worker.event.TaskRunnerStartEvent; import org.apache.tajo.worker.event.TaskRunnerStopEvent; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timer; import java.io.IOException; import java.util.*; @@ -52,7 +51,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< private AtomicBoolean stop = new AtomicBoolean(false); private FinishedTaskCleanThread finishedTaskCleanThread; private Dispatcher dispatcher; - private HashedWheelTimer rpcTimer; public TaskRunnerManager(TajoWorker.WorkerContext workerContext, Dispatcher dispatcher) { super(TaskRunnerManager.class.getName()); @@ -77,7 +75,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< public void start() { finishedTaskCleanThread = new FinishedTaskCleanThread(); finishedTaskCleanThread.start(); - rpcTimer = new HashedWheelTimer(); super.start(); } @@ -102,10 +99,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< finishedTaskCleanThread.interrupted(); } - if(rpcTimer != null){ - rpcTimer.stop(); - } - super.stop(); if(workerContext.isYarnContainerMode()) { workerContext.stopWorker(true); @@ -214,10 +207,6 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< return tajoConf; } - public Timer getRPCTimer(){ - return rpcTimer; - } - class FinishedTaskCleanThread extends Thread { //TODO if history size is large, the historyMap should remove immediately public void run() { http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java index 438867e..9910d79 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -31,9 +31,10 @@ import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.FetchImpl; -import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.junit.Test; +import io.netty.handler.codec.http.QueryStringDecoder; + import java.net.URI; import java.util.*; @@ -89,7 +90,7 @@ public class TestRepartitioner { URI uri = uris.get(0); final Map<String, List<String>> params = - new QueryStringDecoder(uri).getParameters(); + new QueryStringDecoder(uri).parameters(); assertEquals(eachEntry.getKey().toString(), params.get("p").get(0)); assertEquals("h", params.get("type").get(0)); http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java index b3654f9..513eb69 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java @@ -27,15 +27,9 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.pullserver.retriever.FileChunk; -import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.util.CommonTestingUtil; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; -import org.jboss.netty.util.Timer; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; +import org.junit.*; import java.io.File; import java.io.IOException; @@ -50,8 +44,6 @@ public class TestFetcher { private String OUTPUT_DIR = TEST_DATA+"/out/"; private TajoConf conf = new TajoConf(); private TajoPullServerService pullServerService; - private ClientSocketChannelFactory channelFactory; - private Timer timer; @Before public void setUp() throws Exception { @@ -65,16 +57,11 @@ public class TestFetcher { pullServerService = new TajoPullServerService(); pullServerService.init(conf); pullServerService.start(); - - channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1); - timer = new HashedWheelTimer(); } @After public void tearDown(){ pullServerService.stop(); - channelFactory.releaseExternalResources(); - timer.stop(); } @Test @@ -102,7 +89,7 @@ public class TestFetcher { URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); storeChunk.setFromRemote(true); - final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk); FileChunk chunk = fetcher.get(); assertNotNull(chunk); assertNotNull(chunk.getFile()); @@ -148,7 +135,7 @@ public class TestFetcher { URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); storeChunk.setFromRemote(true); - final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk); assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); fetcher.get(); @@ -178,7 +165,7 @@ public class TestFetcher { URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); storeChunk.setFromRemote(true); - final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk); assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); fetcher.get(); @@ -212,7 +199,7 @@ public class TestFetcher { URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); storeChunk.setFromRemote(true); - final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk); assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); fetcher.get(); @@ -232,7 +219,7 @@ public class TestFetcher { URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); storeChunk.setFromRemote(true); - final Fetcher fetcher = new Fetcher(conf, uri, storeChunk, channelFactory, timer); + final Fetcher fetcher = new Fetcher(conf, uri, storeChunk); assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); pullServerService.stop(); http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-project/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index 30f864c..3820d50 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -37,6 +37,7 @@ <protobuf.version>2.5.0</protobuf.version> <tajo.version>0.10.0-SNAPSHOT</tajo.version> <hbase.version>0.98.7-hadoop2</hbase.version> + <netty.version>4.0.25.Final</netty.version> <tajo.root>${project.parent.relativePath}/..</tajo.root> <extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path> </properties> @@ -1024,13 +1025,28 @@ </dependency> <dependency> <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - <version>3.6.6.Final</version> + <artifactId>netty-buffer</artifactId> + <version>${netty.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> - <artifactId>netty-buffer</artifactId> - <version>4.0.24.Final</version> + <artifactId>netty-transport</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + <version>${netty.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> + <version>${netty.version}</version> </dependency> <dependency> <groupId>org.apache.derby</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-pullserver/pom.xml b/tajo-pullserver/pom.xml index 6d13a3c..cdbda3e 100644 --- a/tajo-pullserver/pom.xml +++ b/tajo-pullserver/pom.xml @@ -47,6 +47,14 @@ <dependencies> <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec-http</artifactId> + </dependency> + <dependency> <groupId>org.apache.tajo</groupId> <artifactId>tajo-rpc</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java index b0b8d18..3df82e6 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedChunkedFile.java @@ -22,7 +22,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.nativeio.NativeIO; -import org.jboss.netty.handler.stream.ChunkedFile; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.stream.ChunkedFile; import java.io.FileDescriptor; import java.io.IOException; @@ -52,13 +55,13 @@ public class FadvisedChunkedFile extends ChunkedFile { } @Override - public Object nextChunk() throws Exception { + public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception { if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) { readaheadRequest = readaheadPool - .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength, - getEndOffset(), readaheadRequest); + .readaheadStream(identifier, fd, currentOffset(), readaheadLength, + endOffset(), readaheadRequest); } - return super.nextChunk(); + return super.readChunk(ctx); } @Override @@ -66,11 +69,11 @@ public class FadvisedChunkedFile extends ChunkedFile { if (readaheadRequest != null) { readaheadRequest.cancel(); } - if (PullServerUtil.isNativeIOPossible() && manageOsCache && getEndOffset() - getStartOffset() > 0) { + if (PullServerUtil.isNativeIOPossible() && manageOsCache && endOffset() - startOffset() > 0) { try { PullServerUtil.posixFadviseIfPossible(identifier, fd, - getStartOffset(), getEndOffset() - getStartOffset(), + startOffset(), endOffset() - startOffset(), NativeIO.POSIX.POSIX_FADV_DONTNEED); } catch (Throwable t) { LOG.warn("Failed to manage OS cache for " + identifier, t); http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java index 18cf4b6..643d9e0 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FadvisedFileRegion.java @@ -19,11 +19,13 @@ package org.apache.tajo.pullserver; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.ReadaheadPool; import org.apache.hadoop.io.nativeio.NativeIO; -import org.jboss.netty.channel.DefaultFileRegion; + +import io.netty.channel.DefaultFileRegion; import java.io.FileDescriptor; import java.io.IOException; @@ -79,8 +81,8 @@ public class FadvisedFileRegion extends DefaultFileRegion { throws IOException { if (PullServerUtil.isNativeIOPossible() && manageOsCache && readaheadPool != null) { readaheadRequest = readaheadPool.readaheadStream(identifier, fd, - getPosition() + position, readaheadLength, - getPosition() + getCount(), readaheadRequest); + position() + position, readaheadLength, + position() + count(), readaheadRequest); } if(this.shuffleTransferToAllowed) { @@ -146,11 +148,11 @@ public class FadvisedFileRegion extends DefaultFileRegion { @Override - public void releaseExternalResources() { + protected void deallocate() { if (readaheadRequest != null) { readaheadRequest.cancel(); } - super.releaseExternalResources(); + super.deallocate(); } /** @@ -158,9 +160,9 @@ public class FadvisedFileRegion extends DefaultFileRegion { * we don't need the region to be cached anymore. */ public void transferSuccessful() { - if (PullServerUtil.isNativeIOPossible() && manageOsCache && getCount() > 0) { + if (PullServerUtil.isNativeIOPossible() && manageOsCache && count() > 0 && super.isOpen()) { try { - PullServerUtil.posixFadviseIfPossible(identifier, fd, getPosition(), getCount(), + PullServerUtil.posixFadviseIfPossible(identifier, fd, position(), count(), NativeIO.POSIX.POSIX_FADV_DONTNEED); } catch (Throwable t) { LOG.warn("Failed to manage OS cache for " + identifier, t); http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java index 236db89..9c3c523 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/FileCloseListener.java @@ -18,10 +18,10 @@ package org.apache.tajo.pullserver; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.GenericFutureListener; -public class FileCloseListener implements ChannelFutureListener { +public class FileCloseListener implements GenericFutureListener<ChannelFuture> { private FadvisedFileRegion filePart; private String requestUri; @@ -45,7 +45,7 @@ public class FileCloseListener implements ChannelFutureListener { if(future.isSuccess()){ filePart.transferSuccessful(); } - filePart.releaseExternalResources(); + filePart.deallocate(); if (pullServerService != null) { pullServerService.completeFileChunk(filePart, requestUri, startTime); } http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java new file mode 100644 index 0000000..8661ee5 --- /dev/null +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerChannelInitializer.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.pullserver; + +import java.util.concurrent.TimeUnit; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.http.HttpContentCompressor; +import io.netty.handler.codec.http.HttpRequestDecoder; +import io.netty.handler.codec.http.HttpResponseEncoder; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.timeout.IdleStateHandler; + +public class HttpDataServerChannelInitializer extends ChannelInitializer<Channel> { + private String userName; + private String appId; + public HttpDataServerChannelInitializer(String userName, String appId) { + this.userName = userName; + this.appId = appId; + } + + @Override + protected void initChannel(Channel channel) throws Exception { + // Create a default pipeline implementation. + ChannelPipeline pipeline = channel.pipeline(); + + // Uncomment the following line if you want HTTPS + // SSLEngine engine = + // SecureChatSslContextFactory.getServerContext().createSSLEngine(); + // engine.setUseClientMode(false); + // pipeline.addLast("ssl", new SslHandler(engine)); + + pipeline.addLast("decoder", new HttpRequestDecoder()); + //pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); + pipeline.addLast("deflater", new HttpContentCompressor()); + pipeline.addLast("handler", new HttpDataServerHandler(userName, appId)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java index bfb70b4..472b967 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerHandler.java @@ -19,19 +19,21 @@ package org.apache.tajo.pullserver; import com.google.common.collect.Lists; + +import io.netty.channel.*; +import io.netty.handler.codec.http.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.pullserver.retriever.DataRetriever; import org.apache.tajo.pullserver.retriever.FileChunk; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.*; -import org.jboss.netty.handler.codec.frame.TooLongFrameException; -import org.jboss.netty.handler.codec.http.*; -import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.handler.stream.ChunkedFile; -import org.jboss.netty.util.CharsetUtil; + +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedFile; +import io.netty.util.CharsetUtil; import java.io.*; import java.net.URLDecoder; @@ -41,14 +43,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; -import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; -import static org.jboss.netty.handler.codec.http.HttpMethod.GET; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; -import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; - -public class HttpDataServerHandler extends SimpleChannelUpstreamHandler { +public class HttpDataServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final static Log LOG = LogFactory.getLog(HttpDataServerHandler.class); Map<ExecutionBlockId, DataRetriever> retrievers = @@ -62,21 +57,18 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { - HttpRequest request = (HttpRequest) e.getMessage(); - if (request.getMethod() != GET) { - sendError(ctx, METHOD_NOT_ALLOWED); + + if (request.getMethod() != HttpMethod.GET) { + sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); return; } - String base = - ContainerLocalizer.USERCACHE + "/" + userName + "/" - + ContainerLocalizer.APPCACHE + "/" - + appId + "/output" + "/"; + String base = ContainerLocalizer.USERCACHE + "/" + userName + "/" + ContainerLocalizer.APPCACHE + "/" + appId + + "/output" + "/"; - final Map<String, List<String>> params = - new QueryStringDecoder(request.getUri()).getParameters(); + final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters(); List<FileChunk> chunks = Lists.newArrayList(); List<String> taskIds = splitMaps(params.get("ta")); @@ -90,65 +82,54 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler { } FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]); -// try { -// file = retriever.handle(ctx, request); -// } catch (FileNotFoundException fnf) { -// LOG.error(fnf); -// sendError(ctx, NOT_FOUND); -// return; -// } catch (IllegalArgumentException iae) { -// LOG.error(iae); -// sendError(ctx, BAD_REQUEST); -// return; -// } catch (FileAccessForbiddenException fafe) { -// LOG.error(fafe); -// sendError(ctx, FORBIDDEN); -// return; -// } catch (IOException ioe) { -// LOG.error(ioe); -// sendError(ctx, INTERNAL_SERVER_ERROR); -// return; -// } // Write the content. - Channel ch = e.getChannel(); if (file == null) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT); - ch.write(response); - if (!isKeepAlive(request)) { - ch.close(); + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); + if (!HttpHeaders.isKeepAlive(request)) { + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } else { + response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + ctx.writeAndFlush(response); } - } else { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + } else { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + ChannelFuture writeFuture = null; long totalSize = 0; for (FileChunk chunk : file) { totalSize += chunk.length(); } - setContentLength(response, totalSize); + HttpHeaders.setContentLength(response, totalSize); + if (HttpHeaders.isKeepAlive(request)) { + response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } // Write the initial line and the header. - ch.write(response); - - ChannelFuture writeFuture = null; + writeFuture = ctx.write(response); for (FileChunk chunk : file) { - writeFuture = sendFile(ctx, ch, chunk); + writeFuture = sendFile(ctx, chunk); if (writeFuture == null) { - sendError(ctx, NOT_FOUND); + sendError(ctx, HttpResponseStatus.NOT_FOUND); return; } } + if (ctx.pipeline().get(SslHandler.class) == null) { + writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + } else { + ctx.flush(); + } // Decide whether to close the connection or not. - if (!isKeepAlive(request)) { + if (!HttpHeaders.isKeepAlive(request)) { // Close the connection when the whole content is written out. writeFuture.addListener(ChannelFutureListener.CLOSE); } } + } private ChannelFuture sendFile(ChannelHandlerContext ctx, - Channel ch, FileChunk file) throws IOException { RandomAccessFile raf; try { @@ -158,38 +139,41 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler { } ChannelFuture writeFuture; - if (ch.getPipeline().get(SslHandler.class) != null) { + ChannelFuture lastContentFuture; + if (ctx.pipeline().get(SslHandler.class) != null) { // Cannot use zero-copy with HTTPS. - writeFuture = ch.write(new ChunkedFile(raf, file.startOffset(), - file.length(), 8192)); + lastContentFuture = ctx.write(new HttpChunkedInput(new ChunkedFile(raf, file.startOffset(), + file.length(), 8192))); } else { // No encryption - use zero-copy. final FileRegion region = new DefaultFileRegion(raf.getChannel(), file.startOffset(), file.length()); - writeFuture = ch.write(region); + writeFuture = ctx.write(region); + lastContentFuture = ctx.write(LastHttpContent.EMPTY_LAST_CONTENT); writeFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { - region.releaseExternalResources(); + if (region.refCnt() > 0) { + region.release(); + } } }); } - return writeFuture; + return lastContentFuture; } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - Channel ch = e.getChannel(); - Throwable cause = e.getCause(); + Channel ch = ctx.channel(); if (cause instanceof TooLongFrameException) { - sendError(ctx, BAD_REQUEST); + sendError(ctx, HttpResponseStatus.BAD_REQUEST); return; } - cause.printStackTrace(); - if (ch.isConnected()) { - sendError(ctx, INTERNAL_SERVER_ERROR); + LOG.error(cause.getMessage(), cause); + if (ch.isActive()) { + sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); } } @@ -221,13 +205,12 @@ public class HttpDataServerHandler extends SimpleChannelUpstreamHandler { } private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - response.setContent(ChannelBuffers.copiedBuffer( - "Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, + Unpooled.copiedBuffer("Failure: " + status.toString() + "\r\n", CharsetUtil.UTF_8)); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); // Close the connection as soon as the error message is sent. - ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } private List<String> splitMaps(List<String> qids) { http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java deleted file mode 100644 index 4c8bd8b..0000000 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/HttpDataServerPipelineFactory.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.pullserver; - -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.handler.codec.http.HttpContentCompressor; -import org.jboss.netty.handler.codec.http.HttpRequestDecoder; -import org.jboss.netty.handler.codec.http.HttpResponseEncoder; -import org.jboss.netty.handler.stream.ChunkedWriteHandler; - -import static org.jboss.netty.channel.Channels.pipeline; - -public class HttpDataServerPipelineFactory implements ChannelPipelineFactory { - private String userName; - private String appId; - public HttpDataServerPipelineFactory(String userName, String appId) { - this.userName = userName; - this.appId = appId; - } - - public ChannelPipeline getPipeline() throws Exception { - // Create a default pipeline implementation. - ChannelPipeline pipeline = pipeline(); - - // Uncomment the following line if you want HTTPS - // SSLEngine engine = - // SecureChatSslContextFactory.getServerContext().createSSLEngine(); - // engine.setUseClientMode(false); - // pipeline.addLast("ssl", new SslHandler(engine)); - - pipeline.addLast("decoder", new HttpRequestDecoder()); - //pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); - pipeline.addLast("encoder", new HttpResponseEncoder()); - pipeline.addLast("chunkedWriter", new ChunkedWriteHandler()); - pipeline.addLast("deflater", new HttpContentCompressor()); - pipeline.addLast("handler", new HttpDataServerHandler(userName, appId)); - return pipeline; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java index d633058..ce4018b 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java @@ -19,7 +19,22 @@ package org.apache.tajo.pullserver; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.*; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.GlobalEventExecutor; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -48,23 +63,13 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.pullserver.retriever.FileChunk; +import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.bst.BSTIndex; import org.apache.tajo.util.TajoIdUtils; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.*; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.jboss.netty.handler.codec.frame.TooLongFrameException; -import org.jboss.netty.handler.codec.http.*; -import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.handler.stream.ChunkedWriteHandler; -import org.jboss.netty.util.CharsetUtil; import java.io.File; import java.io.FileNotFoundException; @@ -78,16 +83,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; -import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; -import static org.jboss.netty.handler.codec.http.HttpMethod.GET; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; -import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; public class PullServerAuxService extends AuxiliaryService { @@ -100,9 +95,9 @@ public class PullServerAuxService extends AuxiliaryService { public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; private int port; - private ChannelFactory selector; - private final ChannelGroup accepted = new DefaultChannelGroup(); - private HttpPipelineFactory pipelineFact; + private ServerBootstrap selector; + private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + private HttpChannelInitializer initializer; private int sslFileBufferSize; private ApplicationId appId; @@ -130,7 +125,7 @@ public class PullServerAuxService extends AuxiliaryService { public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 60 * 1024; @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo") - static class ShuffleMetrics implements ChannelFutureListener { + static class ShuffleMetrics implements GenericFutureListener<ChannelFuture> { @Metric({"OutputBytes","PullServer output in bytes"}) MutableCounterLong shuffleOutputBytes; @Metric({"Failed","# of failed shuffle outputs"}) @@ -211,16 +206,10 @@ public class PullServerAuxService extends AuxiliaryService { readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, DEFAULT_SHUFFLE_READAHEAD_BYTES); - ThreadFactory bossFactory = new ThreadFactoryBuilder() - .setNameFormat("PullServerAuxService Netty Boss #%d") - .build(); - ThreadFactory workerFactory = new ThreadFactoryBuilder() - .setNameFormat("PullServerAuxService Netty Worker #%d") - .build(); - - selector = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory)); + selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", 0) + .option(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.TCP_NODELAY, true); localFS = new LocalFileSystem(); super.init(new Configuration(conf)); @@ -233,20 +222,23 @@ public class PullServerAuxService extends AuxiliaryService { @Override public synchronized void start() { Configuration conf = getConfig(); - ServerBootstrap bootstrap = new ServerBootstrap(selector); + ServerBootstrap bootstrap = selector.clone(); try { - pipelineFact = new HttpPipelineFactory(conf); + initializer = new HttpChannelInitializer(conf); } catch (Exception ex) { throw new RuntimeException(ex); } - bootstrap.setPipelineFactory(pipelineFact); + bootstrap.channel(NioServerSocketChannel.class) + .handler(initializer); port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal); - Channel ch = bootstrap.bind(new InetSocketAddress(port)); - accepted.add(ch); - port = ((InetSocketAddress)ch.getLocalAddress()).getPort(); + ChannelFuture future = bootstrap.bind(new InetSocketAddress(port)) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE) + .syncUninterruptibly(); + accepted.add(future.channel()); + port = ((InetSocketAddress)future.channel().localAddress()).getPort(); conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port)); - pipelineFact.PullServer.setPort(port); + initializer.PullServer.setPort(port); LOG.info(getName() + " listening on port " + port); super.start(); @@ -261,10 +253,19 @@ public class PullServerAuxService extends AuxiliaryService { @Override public synchronized void stop() { try { - accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); - ServerBootstrap bootstrap = new ServerBootstrap(selector); - bootstrap.releaseExternalResources(); - pipelineFact.destroy(); + accepted.close(); + if (selector != null) { + if (selector.group() != null) { + selector.group().shutdownGracefully(); + } + if (selector.childGroup() != null) { + selector.childGroup().shutdownGracefully(); + } + } + + if (initializer != null) { + initializer.destroy(); + } localFS.close(); } catch (Throwable t) { @@ -285,12 +286,12 @@ public class PullServerAuxService extends AuxiliaryService { } } - class HttpPipelineFactory implements ChannelPipelineFactory { + class HttpChannelInitializer extends ChannelInitializer<Channel> { final PullServer PullServer; private SSLFactory sslFactory; - public HttpPipelineFactory(Configuration conf) throws Exception { + public HttpChannelInitializer(Configuration conf) throws Exception { PullServer = new PullServer(conf); if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname, ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) { @@ -306,24 +307,25 @@ public class PullServerAuxService extends AuxiliaryService { } @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); if (sslFactory != null) { pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); } - pipeline.addLast("decoder", new HttpRequestDecoder()); - pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); + pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("decoder", new HttpRequestDecoder()); + pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16)); pipeline.addLast("chunking", new ChunkedWriteHandler()); pipeline.addLast("shuffle", PullServer); - return pipeline; // TODO factor security manager into pipeline // TODO factor out encode/decode to permit binary shuffle // TODO factor out decode of index to permit alt. models } } - class PullServer extends SimpleChannelUpstreamHandler { + @ChannelHandler.Sharable + class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> { private final Configuration conf; private final LocalDirAllocator lDirAlloc = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); private int port; @@ -349,33 +351,27 @@ public class PullServerAuxService extends AuxiliaryService { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { - - HttpRequest request = (HttpRequest) e.getMessage(); - if (request.getMethod() != GET) { - sendError(ctx, METHOD_NOT_ALLOWED); + if (request.getMethod() != HttpMethod.GET) { + sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); return; } // Parsing the URL into key-values - final Map<String, List<String>> params = - new QueryStringDecoder(request.getUri()).getParameters(); + final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters(); final List<String> types = params.get("type"); final List<String> taskIdList = params.get("ta"); final List<String> stageIds = params.get("sid"); final List<String> partitionIds = params.get("p"); - if (types == null || taskIdList == null || stageIds == null - || partitionIds == null) { - sendError(ctx, "Required type, taskIds, stage Id, and partition id", - BAD_REQUEST); + if (types == null || taskIdList == null || stageIds == null || partitionIds == null) { + sendError(ctx, "Required type, taskIds, stage Id, and partition id", HttpResponseStatus.BAD_REQUEST); return; } if (types.size() != 1 || stageIds.size() != 1) { - sendError(ctx, "Required type, taskIds, stage Id, and partition id", - BAD_REQUEST); + sendError(ctx, "Required type, taskIds, stage Id, and partition id", HttpResponseStatus.BAD_REQUEST); return; } @@ -389,12 +385,11 @@ public class PullServerAuxService extends AuxiliaryService { // the working dir of tajo worker for each query String queryBaseDir = queryId + "/output" + "/"; - LOG.info("PullServer request param: repartitionType=" + repartitionType + - ", sid=" + sid + ", partitionId=" + partitionId + ", taskIds=" + taskIdList); + LOG.info("PullServer request param: repartitionType=" + repartitionType + ", sid=" + sid + ", partitionId=" + + partitionId + ", taskIds=" + taskIdList); String taskLocalDir = conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname); - if (taskLocalDir == null || - taskLocalDir.equals("")) { + if (taskLocalDir == null || taskLocalDir.equals("")) { LOG.error("Tajo local directory should be specified."); } LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir); @@ -402,9 +397,8 @@ public class PullServerAuxService extends AuxiliaryService { // if a stage requires a range partitioning if (repartitionType.equals("r")) { String ta = taskIds.get(0); - Path path = localFS.makeQualified( - lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" - + ta + "/output/", conf)); + Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + + "/output/", conf)); String startKey = params.get("start").get(0); String endKey = params.get("end").get(0); @@ -415,19 +409,19 @@ public class PullServerAuxService extends AuxiliaryService { chunk = getFileCunks(path, startKey, endKey, last); } catch (Throwable t) { LOG.error("ERROR Request: " + request.getUri(), t); - sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST); + sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST); return; } if (chunk != null) { chunks.add(chunk); } - // if a stage requires a hash repartition or a scattered hash repartition + // if a stage requires a hash repartition or a scattered hash + // repartition } else if (repartitionType.equals("h") || repartitionType.equals("s")) { for (String ta : taskIds) { - Path path = localFS.makeQualified( - lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + - ta + "/output/" + partitionId, conf)); + Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + + "/output/" + partitionId, conf)); File file = new File(path.toUri()); FileChunk chunk = new FileChunk(file, 0, file.length()); chunks.add(chunk); @@ -438,45 +432,54 @@ public class PullServerAuxService extends AuxiliaryService { } // Write the content. - Channel ch = e.getChannel(); if (chunks.size() == 0) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT); - ch.write(response); - if (!isKeepAlive(request)) { - ch.close(); + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); + + if (!HttpHeaders.isKeepAlive(request)) { + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } else { + response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + ctx.writeAndFlush(response); } - } else { + } else { FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]); - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + ChannelFuture writeFuture = null; + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); long totalSize = 0; for (FileChunk chunk : file) { totalSize += chunk.length(); } - setContentLength(response, totalSize); + HttpHeaders.setContentLength(response, totalSize); + if (HttpHeaders.isKeepAlive(request)) { + response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } // Write the initial line and the header. - ch.write(response); - - ChannelFuture writeFuture = null; + writeFuture = ctx.write(response); for (FileChunk chunk : file) { - writeFuture = sendFile(ctx, ch, chunk); + writeFuture = sendFile(ctx, chunk); if (writeFuture == null) { - sendError(ctx, NOT_FOUND); + sendError(ctx, HttpResponseStatus.NOT_FOUND); return; } } + if (ctx.pipeline().get(SslHandler.class) == null) { + writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + } else { + ctx.flush(); + } // Decide whether to close the connection or not. - if (!isKeepAlive(request)) { + if (!HttpHeaders.isKeepAlive(request)) { // Close the connection when the whole content is written out. writeFuture.addListener(ChannelFutureListener.CLOSE); } } + } private ChannelFuture sendFile(ChannelHandlerContext ctx, - Channel ch, FileChunk file) throws IOException { RandomAccessFile spill; try { @@ -485,26 +488,27 @@ public class PullServerAuxService extends AuxiliaryService { LOG.info(file.getFile() + " not found"); return null; } - ChannelFuture writeFuture; - if (ch.getPipeline().get(SslHandler.class) == null) { + + ChannelFuture lastContentFuture; + if (ctx.pipeline().get(SslHandler.class) == null) { final FadvisedFileRegion partition = new FadvisedFileRegion(spill, file.startOffset(), file.length(), manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); - writeFuture = ch.write(partition); - writeFuture.addListener(new FileCloseListener(partition, null, 0, null)); + lastContentFuture = ctx.write(partition); + lastContentFuture.addListener(new FileCloseListener(partition, null, 0, null)); } else { // HTTPS cannot be done with zero copy. final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, file.startOffset(), file.length(), sslFileBufferSize, manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); - writeFuture = ch.write(chunk); + lastContentFuture = ctx.write(new HttpChunkedInput(chunk)); } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(file.length()); // optimistic - return writeFuture; + return lastContentFuture; } - + private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { sendError(ctx, "", status); @@ -512,29 +516,26 @@ public class PullServerAuxService extends AuxiliaryService { private void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - response.setContent( - ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, + Unpooled.copiedBuffer(message, CharsetUtil.UTF_8)); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); // Close the connection as soon as the error message is sent. - ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - Channel ch = e.getChannel(); - Throwable cause = e.getCause(); + Channel ch = ctx.channel(); if (cause instanceof TooLongFrameException) { - sendError(ctx, BAD_REQUEST); + sendError(ctx, HttpResponseStatus.BAD_REQUEST); return; } LOG.error("PullServer error: ", cause); - if (ch.isConnected()) { - LOG.error("PullServer error " + e); - sendError(ctx, INTERNAL_SERVER_ERROR); + if (ch.isActive()) { + sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR); } } }
