http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 860bc8e..f0dcd26 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -19,6 +19,10 @@ package org.apache.tajo.pullserver; import com.google.common.collect.Lists; + +import io.netty.channel.*; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.*; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; @@ -53,15 +57,18 @@ import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.bst.BSTIndex; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.*; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.handler.codec.http.*; -import org.jboss.netty.handler.ssl.SslHandler; -import org.jboss.netty.handler.stream.ChunkedWriteHandler; -import org.jboss.netty.util.CharsetUtil; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.concurrent.GlobalEventExecutor; import java.io.*; import java.net.InetSocketAddress; @@ -72,16 +79,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; -import static org.jboss.netty.handler.codec.http.HttpHeaders.setContentLength; -import static org.jboss.netty.handler.codec.http.HttpMethod.GET; -import static org.jboss.netty.handler.codec.http.HttpResponseStatus.*; -import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; - public class TajoPullServerService extends AbstractService { private static final Log LOG = LogFactory.getLog(TajoPullServerService.class); @@ -93,9 +92,9 @@ public class TajoPullServerService extends AbstractService { public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 4 * 1024 * 1024; private int port; - private ChannelFactory selector; - private final ChannelGroup accepted = new DefaultChannelGroup(); - private HttpPipelineFactory pipelineFact; + private ServerBootstrap selector; + private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + private HttpChannelInitializer channelInitializer; private int sslFileBufferSize; private ApplicationId appId; @@ -131,7 +130,7 @@ public class TajoPullServerService extends AbstractService { } @Metrics(name="PullServerShuffleMetrics", about="PullServer output metrics", context="tajo") - static class ShuffleMetrics implements ChannelFutureListener { + static class ShuffleMetrics implements GenericFutureListener<ChannelFuture> { @Metric({"OutputBytes","PullServer output in bytes"}) MutableCounterLong shuffleOutputBytes; @Metric({"Failed","# of failed shuffle outputs"}) @@ -212,7 +211,10 @@ public class TajoPullServerService extends AbstractService { int workerNum = conf.getInt("tajo.shuffle.rpc.server.worker-thread-num", Runtime.getRuntime().availableProcessors() * 2); - selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum); + selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum) + .option(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.TCP_NODELAY, true); localFS = new LocalFileSystem(); @@ -228,23 +230,26 @@ public class TajoPullServerService extends AbstractService { // TODO change AbstractService to throw InterruptedException @Override public synchronized void serviceInit(Configuration conf) throws Exception { - ServerBootstrap bootstrap = new ServerBootstrap(selector); + ServerBootstrap bootstrap = selector.clone(); try { - pipelineFact = new HttpPipelineFactory(conf); + channelInitializer = new HttpChannelInitializer(conf); } catch (Exception ex) { throw new RuntimeException(ex); } - bootstrap.setPipelineFactory(pipelineFact); + bootstrap.childHandler(channelInitializer) + .channel(NioServerSocketChannel.class); port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal); - Channel ch = bootstrap.bind(new InetSocketAddress(port)); + ChannelFuture future = bootstrap.bind(new InetSocketAddress(port)) + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE) + .syncUninterruptibly(); - accepted.add(ch); - port = ((InetSocketAddress)ch.getLocalAddress()).getPort(); + accepted.add(future.channel()); + port = ((InetSocketAddress)future.channel().localAddress()).getPort(); conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port)); - pipelineFact.PullServer.setPort(port); + channelInitializer.PullServer.setPort(port); LOG.info(getName() + " listening on port " + port); sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, @@ -314,10 +319,19 @@ public class TajoPullServerService extends AbstractService { @Override public synchronized void stop() { try { - accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); - ServerBootstrap bootstrap = new ServerBootstrap(selector); - bootstrap.releaseExternalResources(); - pipelineFact.destroy(); + accepted.close(); + if (selector != null) { + if (selector.group() != null) { + selector.group().shutdownGracefully(); + } + if (selector.childGroup() != null) { + selector.childGroup().shutdownGracefully(); + } + } + + if (channelInitializer != null) { + channelInitializer.destroy(); + } localFS.close(); } catch (Throwable t) { @@ -337,12 +351,12 @@ public class TajoPullServerService extends AbstractService { } } - class HttpPipelineFactory implements ChannelPipelineFactory { + class HttpChannelInitializer extends ChannelInitializer<SocketChannel> { final PullServer PullServer; private SSLFactory sslFactory; - public HttpPipelineFactory(Configuration conf) throws Exception { + public HttpChannelInitializer(Configuration conf) throws Exception { PullServer = new PullServer(conf); if (conf.getBoolean(ConfVars.SHUFFLE_SSL_ENABLED_KEY.varname, ConfVars.SHUFFLE_SSL_ENABLED_KEY.defaultBoolVal)) { @@ -358,8 +372,8 @@ public class TajoPullServerService extends AbstractService { } @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); + protected void initChannel(SocketChannel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); if (sslFactory != null) { pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); } @@ -367,10 +381,9 @@ public class TajoPullServerService extends AbstractService { int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname, ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal); pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize)); - pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); + pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16)); pipeline.addLast("chunking", new ChunkedWriteHandler()); pipeline.addLast("shuffle", PullServer); - return pipeline; // TODO factor security manager into pipeline // TODO factor out encode/decode to permit binary shuffle // TODO factor out decode of index to permit alt. models @@ -408,31 +421,31 @@ public class TajoPullServerService extends AbstractService { this.numFiles = numFiles; this.remainFiles = new AtomicInteger(numFiles); } - public void decrementRemainFiles(FileRegion filePart, long fileStartTime) { - synchronized(remainFiles) { - long fileSendTime = System.currentTimeMillis() - fileStartTime; - if (fileSendTime > 20 * 1000) { - LOG.info("PullServer send too long time: filePos=" + filePart.getPosition() + ", fileLen=" + filePart.getCount()); - numSlowFile++; - } - if (fileSendTime > maxTime) { - maxTime = fileSendTime; - } - if (fileSendTime < minTime) { - minTime = fileSendTime; - } - int remain = remainFiles.decrementAndGet(); - if (remain <= 0) { - processingStatusMap.remove(requestUri); - LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " + - "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " + - "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile); - } + + public synchronized void decrementRemainFiles(FileRegion filePart, long fileStartTime) { + long fileSendTime = System.currentTimeMillis() - fileStartTime; + if (fileSendTime > 20 * 1000) { + LOG.info("PullServer send too long time: filePos=" + filePart.position() + ", fileLen=" + filePart.count()); + numSlowFile++; + } + if (fileSendTime > maxTime) { + maxTime = fileSendTime; + } + if (fileSendTime < minTime) { + minTime = fileSendTime; + } + int remain = remainFiles.decrementAndGet(); + if (remain <= 0) { + processingStatusMap.remove(requestUri); + LOG.info("PullServer processing status: totalTime=" + (System.currentTimeMillis() - startTime) + " ms, " + + "makeFileListTime=" + makeFileListTime + " ms, minTime=" + minTime + " ms, maxTime=" + maxTime + " ms, " + + "numFiles=" + numFiles + ", numSlowFile=" + numSlowFile); } } } - class PullServer extends SimpleChannelUpstreamHandler { + @ChannelHandler.Sharable + class PullServer extends SimpleChannelInboundHandler<FullHttpRequest> { private final Configuration conf; // private final IndexCache indexCache; @@ -466,69 +479,58 @@ public class TajoPullServerService extends AbstractService { } @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) - throws Exception { - - accepted.add(evt.getChannel()); + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + accepted.add(ctx.channel()); LOG.info(String.format("Current number of shuffle connections (%d)", accepted.size())); - super.channelOpen(ctx, evt); - + super.channelRegistered(ctx); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { + public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) + throws Exception { - HttpRequest request = (HttpRequest) e.getMessage(); - if (request.getMethod() != GET) { - sendError(ctx, METHOD_NOT_ALLOWED); + if (request.getMethod() != HttpMethod.GET) { + sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED); return; } ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString()); processingStatusMap.put(request.getUri().toString(), processingStatus); // Parsing the URL into key-values - final Map<String, List<String>> params = - new QueryStringDecoder(request.getUri()).getParameters(); + final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters(); final List<String> types = params.get("type"); final List<String> qids = params.get("qid"); final List<String> taskIdList = params.get("ta"); - final List<String> stageIds = params.get("sid"); + final List<String> subQueryIds = params.get("sid"); final List<String> partIds = params.get("p"); final List<String> offsetList = params.get("offset"); final List<String> lengthList = params.get("length"); - if (types == null || stageIds == null || qids == null || partIds == null) { - sendError(ctx, "Required queryId, type, stage Id, and part id", - BAD_REQUEST); + if (types == null || subQueryIds == null || qids == null || partIds == null) { + sendError(ctx, "Required queryId, type, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST); return; } - if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { - sendError(ctx, "Required qids, type, taskIds, stage Id, and part id", - BAD_REQUEST); + if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) { + sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST); return; } String partId = partIds.get(0); String queryId = qids.get(0); String shuffleType = types.get(0); - String sid = stageIds.get(0); + String sid = subQueryIds.get(0); long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; - if (!shuffleType.equals("h") && !shuffleType.equals("s") && taskIdList == null) { - sendError(ctx, "Required taskIds", BAD_REQUEST); - } - List<String> taskIds = splitMaps(taskIdList); String queryBaseDir = queryId.toString() + "/output"; if (LOG.isDebugEnabled()) { - LOG.debug("PullServer request param: shuffleType=" + shuffleType + - ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList); + LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId + + ", taskIds=" + taskIdList); // the working dir of tajo worker for each query LOG.debug("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir); @@ -539,13 +541,14 @@ public class TajoPullServerService extends AbstractService { // if a stage requires a range shuffle if (shuffleType.equals("r")) { String ta = taskIds.get(0); - if(!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)){ - LOG.warn(e); - sendError(ctx, NO_CONTENT); + String pathString = queryBaseDir + "/" + sid + "/" + ta + "/output/"; + if (!lDirAlloc.ifExists(pathString, conf)) { + LOG.warn(pathString + "does not exist."); + sendError(ctx, HttpResponseStatus.NO_CONTENT); return; } - Path path = localFS.makeQualified( - lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)); + Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + + "/output/", conf)); String startKey = params.get("start").get(0); String endKey = params.get("end").get(0); boolean last = params.get("final") != null; @@ -555,7 +558,7 @@ public class TajoPullServerService extends AbstractService { chunk = getFileCunks(path, startKey, endKey, last); } catch (Throwable t) { LOG.error("ERROR Request: " + request.getUri(), t); - sendError(ctx, "Cannot get file chunks to be sent", BAD_REQUEST); + sendError(ctx, "Cannot get file chunks to be sent", HttpResponseStatus.BAD_REQUEST); return; } if (chunk != null) { @@ -568,7 +571,7 @@ public class TajoPullServerService extends AbstractService { String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId; if (!lDirAlloc.ifExists(partPath, conf)) { LOG.warn("Partition shuffle file not exists: " + partPath); - sendError(ctx, NO_CONTENT); + sendError(ctx, HttpResponseStatus.NO_CONTENT); return; } @@ -581,7 +584,7 @@ public class TajoPullServerService extends AbstractService { if (startPos >= file.length()) { String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]"; LOG.error(errorMessage); - sendError(ctx, errorMessage, BAD_REQUEST); + sendError(ctx, errorMessage, HttpResponseStatus.BAD_REQUEST); return; } LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length()); @@ -589,44 +592,53 @@ public class TajoPullServerService extends AbstractService { chunks.add(chunk); } else { LOG.error("Unknown shuffle type: " + shuffleType); - sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST); + sendError(ctx, "Unknown shuffle type:" + shuffleType, HttpResponseStatus.BAD_REQUEST); return; } processingStatus.setNumFiles(chunks.size()); processingStatus.makeFileListTime = System.currentTimeMillis() - processingStatus.startTime; // Write the content. - Channel ch = e.getChannel(); if (chunks.size() == 0) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NO_CONTENT); - ch.write(response); - if (!isKeepAlive(request)) { - ch.close(); + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NO_CONTENT); + + if (!HttpHeaders.isKeepAlive(request)) { + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); + } else { + response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + ctx.writeAndFlush(response); } - } else { + } else { FileChunk[] file = chunks.toArray(new FileChunk[chunks.size()]); - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + ChannelFuture writeFuture = null; + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); long totalSize = 0; for (FileChunk chunk : file) { totalSize += chunk.length(); } - setContentLength(response, totalSize); + HttpHeaders.setContentLength(response, totalSize); + if (HttpHeaders.isKeepAlive(request)) { + response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE); + } // Write the initial line and the header. - ch.write(response); - - ChannelFuture writeFuture = null; + writeFuture = ctx.write(response); for (FileChunk chunk : file) { - writeFuture = sendFile(ctx, ch, chunk, request.getUri().toString()); + writeFuture = sendFile(ctx, chunk, request.getUri().toString()); if (writeFuture == null) { - sendError(ctx, NOT_FOUND); + sendError(ctx, HttpResponseStatus.NOT_FOUND); return; } } + if (ctx.pipeline().get(SslHandler.class) == null) { + writeFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); + } else { + ctx.flush(); + } // Decide whether to close the connection or not. - if (!isKeepAlive(request)) { + if (!HttpHeaders.isKeepAlive(request)) { // Close the connection when the whole content is written out. writeFuture.addListener(ChannelFutureListener.CLOSE); } @@ -634,19 +646,18 @@ public class TajoPullServerService extends AbstractService { } private ChannelFuture sendFile(ChannelHandlerContext ctx, - Channel ch, FileChunk file, String requestUri) throws IOException { long startTime = System.currentTimeMillis(); - RandomAccessFile spill = null; + RandomAccessFile spill = null; ChannelFuture writeFuture; try { spill = new RandomAccessFile(file.getFile(), "r"); - if (ch.getPipeline().get(SslHandler.class) == null) { + if (ctx.pipeline().get(SslHandler.class) == null) { final FadvisedFileRegion filePart = new FadvisedFileRegion(spill, file.startOffset(), file.length(), manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); - writeFuture = ch.write(filePart); + writeFuture = ctx.write(filePart); writeFuture.addListener(new FileCloseListener(filePart, requestUri, startTime, TajoPullServerService.this)); } else { // HTTPS cannot be done with zero copy. @@ -654,7 +665,7 @@ public class TajoPullServerService extends AbstractService { file.startOffset(), file.length(), sslFileBufferSize, manageOsCache, readaheadLength, readaheadPool, file.getFile().getAbsolutePath()); - writeFuture = ch.write(chunk); + writeFuture = ctx.write(new HttpChunkedInput(chunk)); } } catch (FileNotFoundException e) { LOG.info(file.getFile() + " not found"); @@ -678,22 +689,20 @@ public class TajoPullServerService extends AbstractService { private void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) { - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); - response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - response.setContent( - ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8)); + FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, + Unpooled.copiedBuffer(message, CharsetUtil.UTF_8)); + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8"); // Close the connection as soon as the error message is sent. - ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE); + ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - LOG.error(e.getCause().getMessage(), e.getCause()); - //if channel.close() is not called, never closed files in this request - if (ctx.getChannel().isConnected()){ - ctx.getChannel().close(); + LOG.error(cause.getMessage(), cause); + if (ctx.channel().isOpen()) { + ctx.channel().close(); } } }
http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java index 5591bba..fb91094 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/AdvancedDataRetriever.java @@ -20,6 +20,7 @@ package org.apache.tajo.pullserver.retriever; import com.google.common.collect.Lists; import com.google.common.collect.Maps; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.ExecutionBlockId; @@ -27,9 +28,10 @@ import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; import org.apache.tajo.pullserver.FileAccessForbiddenException; import org.apache.tajo.util.TajoIdUtils; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.codec.http.QueryStringDecoder; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.QueryStringDecoder; import java.io.File; import java.io.FileNotFoundException; @@ -67,7 +69,7 @@ public class AdvancedDataRetriever implements DataRetriever { throws IOException { final Map<String, List<String>> params = - new QueryStringDecoder(request.getUri()).getParameters(); + new QueryStringDecoder(request.getUri()).parameters(); if (!params.containsKey("qid")) { throw new FileNotFoundException("No such qid: " + params.containsKey("qid")); http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java index 8f55f7b..0a1ad41 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DataRetriever.java @@ -18,8 +18,8 @@ package org.apache.tajo.pullserver.retriever; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.http.HttpRequest; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpRequest; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java index dc63929..e26bcd6 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/retriever/DirectoryRetriever.java @@ -18,11 +18,12 @@ package org.apache.tajo.pullserver.retriever; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.handler.codec.http.HttpRequest; import org.apache.tajo.pullserver.FileAccessForbiddenException; import org.apache.tajo.pullserver.HttpDataServerHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.HttpRequest; + import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-rpc/pom.xml b/tajo-rpc/pom.xml index d0037ca..2dc3765 100644 --- a/tajo-rpc/pom.xml +++ b/tajo-rpc/pom.xml @@ -138,7 +138,15 @@ <dependencies> <dependency> <groupId>io.netty</groupId> - <artifactId>netty</artifactId> + <artifactId>netty-transport</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-codec</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-handler</artifactId> </dependency> <dependency> <groupId>commons-logging</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java index 4b1842e..5845229 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcClient.java @@ -20,12 +20,15 @@ package org.apache.tajo.rpc; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.*; + +import io.netty.channel.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import org.jboss.netty.channel.*; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.GenericFutureListener; import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -38,8 +41,7 @@ import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; public class AsyncRpcClient extends NettyClientBase { private static final Log LOG = LogFactory.getLog(AsyncRpcClient.class); - private final ChannelUpstreamHandler handler; - private final ChannelPipelineFactory pipeFactory; + private final ChannelInitializer<Channel> initializer; private final ProxyRpcChannel rpcChannel; private final AtomicInteger sequence = new AtomicInteger(0); @@ -56,7 +58,7 @@ public class AsyncRpcClient extends NettyClientBase { * new an instance through this constructor. */ AsyncRpcClient(final Class<?> protocol, - final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries) + final InetSocketAddress addr, int retries) throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException { this.protocol = protocol; @@ -65,10 +67,9 @@ public class AsyncRpcClient extends NettyClientBase { Class<?> serviceClass = Class.forName(serviceClassName); stubMethod = serviceClass.getMethod("newStub", RpcChannel.class); - this.handler = new ClientChannelUpstreamHandler(); - pipeFactory = new ProtoPipelineFactory(handler, + initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), RpcResponse.getDefaultInstance()); - super.init(addr, pipeFactory, factory, retries); + super.init(addr, initializer, retries); rpcChannel = new ProxyRpcChannel(); this.key = new RpcConnectionKey(addr, protocol, true); } @@ -83,7 +84,7 @@ public class AsyncRpcClient extends NettyClientBase { try { return (T) stubMethod.invoke(null, rpcChannel); } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); + throw new RemoteException(e.getMessage(), e); } } @@ -91,12 +92,32 @@ public class AsyncRpcClient extends NettyClientBase { return this.rpcChannel; } + protected void sendExceptions(String message) { + for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) { + ResponseCallback callback = callbackEntry.getValue(); + Integer id = callbackEntry.getKey(); + + RpcResponse.Builder responseBuilder = RpcResponse.newBuilder() + .setErrorMessage(message) + .setId(id); + + callback.run(responseBuilder.build()); + } + } + + @Override + public void close() { + sendExceptions("AsyncRpcClient terminates all the connections"); + + super.close(); + } + private class ProxyRpcChannel implements RpcChannel { - private final ClientChannelUpstreamHandler handler; + private final ClientChannelInboundHandler handler; public ProxyRpcChannel() { - this.handler = getChannel().getPipeline() - .get(ClientChannelUpstreamHandler.class); + this.handler = getChannel().pipeline() + .get(ClientChannelInboundHandler.class); if (handler == null) { throw new IllegalArgumentException("Channel does not have " + @@ -117,7 +138,17 @@ public class AsyncRpcClient extends NettyClientBase { handler.registerCallback(nextSeqId, new ResponseCallback(controller, responseType, done)); - getChannel().write(rpcRequest); + ChannelPromise channelPromise = getChannel().newPromise(); + channelPromise.addListener(new GenericFutureListener<ChannelFuture>() { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + handler.exceptionCaught(null, new ServiceException(future.cause())); + } + } + }); + getChannel().writeAndFlush(rpcRequest, channelPromise); } private Message buildRequest(int seqId, @@ -180,10 +211,11 @@ public class AsyncRpcClient extends NettyClientBase { private String getErrorMessage(String message) { return "Exception [" + protocol.getCanonicalName() + "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) - getChannel().getRemoteAddress()) + ")]: " + message; + getChannel().remoteAddress()) + ")]: " + message; } - private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler { + @ChannelHandler.Sharable + private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter { synchronized void registerCallback(int seqId, ResponseCallback callback) { @@ -196,37 +228,39 @@ public class AsyncRpcClient extends NettyClientBase { } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - RpcResponse response = (RpcResponse) e.getMessage(); - ResponseCallback callback = requests.remove(response.getId()); + if (msg instanceof RpcResponse) { + try { + RpcResponse response = (RpcResponse) msg; + ResponseCallback callback = requests.remove(response.getId()); - if (callback == null) { - LOG.warn("Dangling rpc call"); - } else { - callback.run(response); + if (callback == null) { + LOG.warn("Dangling rpc call"); + } else { + callback.run(response); + } + } finally { + ReferenceCountUtil.release(msg); + } } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - LOG.error(getRemoteAddress() + "," + protocol + "," + e.getCause().getMessage(), e.getCause()); - - for(Map.Entry<Integer, ResponseCallback> callbackEntry: requests.entrySet()) { - ResponseCallback callback = callbackEntry.getValue(); - Integer id = callbackEntry.getKey(); - - RpcResponse.Builder responseBuilder = RpcResponse.newBuilder() - .setErrorMessage(e.toString()) - .setId(id); + LOG.error(getRemoteAddress() + "," + protocol + "," + cause.getMessage(), cause); - callback.run(responseBuilder.build()); - } + sendExceptions(cause.getMessage()); + if(LOG.isDebugEnabled()) { - LOG.error("" + e.getCause(), e.getCause()); + LOG.error(cause.getMessage(), cause); } else { - LOG.error("RPC Exception:" + e.getCause()); + LOG.error("RPC Exception:" + cause.getMessage()); + } + + if (ctx != null && ctx.channel().isActive()) { + ctx.channel().close(); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java index f9c5d3b..3b5a747 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/AsyncRpcServer.java @@ -18,16 +18,16 @@ package org.apache.tajo.rpc; +import com.google.protobuf.*; import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; + +import io.netty.channel.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import org.jboss.netty.channel.*; + +import io.netty.util.ReferenceCountUtil; import java.lang.reflect.Method; import java.net.InetSocketAddress; @@ -36,7 +36,7 @@ public class AsyncRpcServer extends NettyServerBase { private static final Log LOG = LogFactory.getLog(AsyncRpcServer.class); private final Service service; - private final ChannelPipelineFactory pipeline; + private final ChannelInitializer<Channel> initializer; public AsyncRpcServer(final Class<?> protocol, final Object instance, @@ -52,87 +52,97 @@ public class AsyncRpcServer extends NettyServerBase { Method method = serviceClass.getMethod("newReflectiveService", interfaceClass); this.service = (Service) method.invoke(null, instance); - ServerHandler handler = new ServerHandler(); - this.pipeline = new ProtoPipelineFactory(handler, - RpcRequest.getDefaultInstance()); - super.init(this.pipeline, workerNum); + this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); + super.init(this.initializer, workerNum); } - private class ServerHandler extends SimpleChannelUpstreamHandler { + @ChannelHandler.Sharable + private class ServerHandler extends ChannelInboundHandlerAdapter { @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) - throws Exception { - - accepted.add(evt.getChannel()); + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + accepted.add(ctx.channel()); if(LOG.isDebugEnabled()){ LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size())); } - super.channelOpen(ctx, evt); + super.channelRegistered(ctx); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + accepted.remove(ctx.channel()); + if (LOG.isDebugEnabled()) { + LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size()); + } + super.channelUnregistered(ctx); + } + + @Override + public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof RpcRequest) { + try { + final RpcRequest request = (RpcRequest) msg; - final RpcRequest request = (RpcRequest) e.getMessage(); + String methodName = request.getMethodName(); + MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); - String methodName = request.getMethodName(); - MethodDescriptor methodDescriptor = service.getDescriptorForType(). - findMethodByName(methodName); + if (methodDescriptor == null) { + throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + } - if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), - new NoSuchMethodException(methodName)); - } + Message paramProto = null; + if (request.hasRequestMessage()) { + try { + paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() + .mergeFrom(request.getRequestMessage()).build(); + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); + } + } - Message paramProto = null; - if (request.hasRequestMessage()) { - try { - paramProto = service.getRequestPrototype(methodDescriptor) - .newBuilderForType().mergeFrom(request.getRequestMessage()). - build(); - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } - } + final RpcController controller = new NettyRpcController(); - final Channel channel = e.getChannel(); - final RpcController controller = new NettyRpcController(); + RpcCallback<Message> callback = !request.hasId() ? null : new RpcCallback<Message>() { - RpcCallback<Message> callback = - !request.hasId() ? null : new RpcCallback<Message>() { + public void run(Message returnValue) { - public void run(Message returnValue) { + RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); - RpcResponse.Builder builder = RpcResponse.newBuilder() - .setId(request.getId()); + if (returnValue != null) { + builder.setResponseMessage(returnValue.toByteString()); + } - if (returnValue != null) { - builder.setResponseMessage(returnValue.toByteString()); - } + if (controller.failed()) { + builder.setErrorMessage(controller.errorText()); + } - if (controller.failed()) { - builder.setErrorMessage(controller.errorText()); - } + ctx.writeAndFlush(builder.build()); + } + }; - channel.write(builder.build()); - } - }; + service.callMethod(methodDescriptor, controller, paramProto, callback); - service.callMethod(methodDescriptor, controller, paramProto, callback); + } finally { + ReferenceCountUtil.release(msg); + } + } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception{ - - if (e.getCause() instanceof RemoteCallException) { - RemoteCallException callException = (RemoteCallException) e.getCause(); - e.getChannel().write(callException.getResponse()); + if (cause instanceof RemoteCallException) { + RemoteCallException callException = (RemoteCallException) cause; + ctx.writeAndFlush(callException.getResponse()); } else { - LOG.error(e.getCause()); + LOG.error(cause.getMessage()); + } + + if (ctx != null && ctx.channel().isActive()) { + ctx.channel().close(); } } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java index 869919c..4ec5718 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcClient.java @@ -18,22 +18,23 @@ package org.apache.tajo.rpc; -import com.google.protobuf.BlockingRpcChannel; +import com.google.protobuf.*; import com.google.protobuf.Descriptors.MethodDescriptor; -import com.google.protobuf.Message; -import com.google.protobuf.RpcController; -import com.google.protobuf.ServiceException; + +import io.netty.channel.*; +import io.netty.util.concurrent.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; -import org.jboss.netty.channel.*; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import io.netty.util.ReferenceCountUtil; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.*; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; @@ -41,8 +42,7 @@ import static org.apache.tajo.rpc.RpcConnectionPool.RpcConnectionKey; public class BlockingRpcClient extends NettyClientBase { private static final Log LOG = LogFactory.getLog(RpcProtos.class); - private final ChannelUpstreamHandler handler; - private final ChannelPipelineFactory pipeFactory; + private final ChannelInitializer<Channel> initializer; private final ProxyRpcChannel rpcChannel; private final AtomicInteger sequence = new AtomicInteger(0); @@ -59,7 +59,7 @@ public class BlockingRpcClient extends NettyClientBase { * new an instance through this constructor. */ BlockingRpcClient(final Class<?> protocol, - final InetSocketAddress addr, ClientSocketChannelFactory factory, int retries) + final InetSocketAddress addr, int retries) throws ClassNotFoundException, NoSuchMethodException, ConnectTimeoutException { this.protocol = protocol; @@ -69,10 +69,8 @@ public class BlockingRpcClient extends NettyClientBase { stubMethod = serviceClass.getMethod("newBlockingStub", BlockingRpcChannel.class); - this.handler = new ClientChannelUpstreamHandler(); - pipeFactory = new ProtoPipelineFactory(handler, - RpcResponse.getDefaultInstance()); - super.init(addr, pipeFactory, factory, retries); + initializer = new ProtoChannelInitializer(new ClientChannelInboundHandler(), RpcResponse.getDefaultInstance()); + super.init(addr, initializer, retries); rpcChannel = new ProxyRpcChannel(); this.key = new RpcConnectionKey(addr, protocol, false); @@ -96,14 +94,24 @@ public class BlockingRpcClient extends NettyClientBase { return this.rpcChannel; } + @Override + public void close() { + for(ProtoCallFuture callback: requests.values()) { + callback.setFailed("BlockingRpcClient terminates all the connections", + new ServiceException("BlockingRpcClient terminates all the connections")); + } + + super.close(); + } + private class ProxyRpcChannel implements BlockingRpcChannel { - private final ClientChannelUpstreamHandler handler; + private final ClientChannelInboundHandler handler; public ProxyRpcChannel() { - this.handler = getChannel().getPipeline(). - get(ClientChannelUpstreamHandler.class); + this.handler = getChannel().pipeline(). + get(ClientChannelInboundHandler.class); if (handler == null) { throw new IllegalArgumentException("Channel does not have " + @@ -125,10 +133,20 @@ public class BlockingRpcClient extends NettyClientBase { ProtoCallFuture callFuture = new ProtoCallFuture(controller, responsePrototype); requests.put(nextSeqId, callFuture); - getChannel().write(rpcRequest); + + ChannelPromise channelPromise = getChannel().newPromise(); + channelPromise.addListener(new GenericFutureListener<ChannelFuture>() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + handler.exceptionCaught(null, new ServiceException(future.cause())); + } + } + }); + getChannel().writeAndFlush(rpcRequest, channelPromise); try { - return callFuture.get(); + return callFuture.get(60, TimeUnit.SECONDS); } catch (Throwable t) { if (t instanceof ExecutionException) { Throwable cause = t.getCause(); @@ -159,7 +177,7 @@ public class BlockingRpcClient extends NettyClientBase { if(protocol != null && getChannel() != null) { return protocol.getName() + "(" + RpcUtils.normalizeInetSocketAddress((InetSocketAddress) - getChannel().getRemoteAddress()) + "): " + message; + getChannel().remoteAddress()) + "): " + message; } else { return "Exception " + message; } @@ -168,55 +186,64 @@ public class BlockingRpcClient extends NettyClientBase { private TajoServiceException makeTajoServiceException(RpcResponse response, Throwable cause) { if(protocol != null && getChannel() != null) { return new TajoServiceException(response.getErrorMessage(), cause, protocol.getName(), - RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().getRemoteAddress())); + RpcUtils.normalizeInetSocketAddress((InetSocketAddress)getChannel().remoteAddress())); } else { return new TajoServiceException(response.getErrorMessage()); } } - private class ClientChannelUpstreamHandler extends SimpleChannelUpstreamHandler { + @ChannelHandler.Sharable + private class ClientChannelInboundHandler extends ChannelInboundHandlerAdapter { @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - RpcResponse rpcResponse = (RpcResponse) e.getMessage(); - ProtoCallFuture callback = requests.remove(rpcResponse.getId()); + if (msg instanceof RpcResponse) { + try { + RpcResponse rpcResponse = (RpcResponse) msg; + ProtoCallFuture callback = requests.remove(rpcResponse.getId()); - if (callback == null) { - LOG.warn("Dangling rpc call"); - } else { - if (rpcResponse.hasErrorMessage()) { - callback.setFailed(rpcResponse.getErrorMessage(), - makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace()))); - throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage())); - } else { - Message responseMessage; - - if (!rpcResponse.hasResponseMessage()) { - responseMessage = null; + if (callback == null) { + LOG.warn("Dangling rpc call"); } else { - responseMessage = - callback.returnType.newBuilderForType(). - mergeFrom(rpcResponse.getResponseMessage()).build(); + if (rpcResponse.hasErrorMessage()) { + callback.setFailed(rpcResponse.getErrorMessage(), + makeTajoServiceException(rpcResponse, new ServiceException(rpcResponse.getErrorTrace()))); + throw new RemoteException(getErrorMessage(rpcResponse.getErrorMessage())); + } else { + Message responseMessage; + + if (!rpcResponse.hasResponseMessage()) { + responseMessage = null; + } else { + responseMessage = callback.returnType.newBuilderForType().mergeFrom(rpcResponse.getResponseMessage()) + .build(); + } + + callback.setResponse(responseMessage); + } } - - callback.setResponse(responseMessage); + } finally { + ReferenceCountUtil.release(msg); } } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - e.getChannel().close(); for(ProtoCallFuture callback: requests.values()) { - callback.setFailed(e.getCause().getMessage(), e.getCause()); + callback.setFailed(cause.getMessage(), cause); } + if(LOG.isDebugEnabled()) { - LOG.error("" + e.getCause().getMessage(), e.getCause()); + LOG.error("" + cause.getMessage(), cause); } else { - LOG.error("RPC Exception:" + e.getCause().getMessage()); + LOG.error("RPC Exception:" + cause.getMessage()); + } + if (ctx != null && ctx.channel().isActive()) { + ctx.channel().close(); } } } @@ -253,6 +280,9 @@ public class BlockingRpcClient extends NettyClientBase { public Message get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if(sem.tryAcquire(timeout, unit)) { + if (ee != null) { + throw ee; + } return response; } else { throw new TimeoutException(); http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java index 9e0d57c..0ce359f 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/BlockingRpcServer.java @@ -22,19 +22,22 @@ import com.google.protobuf.BlockingService; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.RpcController; + +import io.netty.channel.*; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.jboss.netty.channel.*; import org.apache.tajo.rpc.RpcProtos.RpcRequest; import org.apache.tajo.rpc.RpcProtos.RpcResponse; +import io.netty.util.ReferenceCountUtil; + import java.lang.reflect.Method; import java.net.InetSocketAddress; public class BlockingRpcServer extends NettyServerBase { private static Log LOG = LogFactory.getLog(BlockingRpcServer.class); private final BlockingService service; - private final ChannelPipelineFactory pipeline; + private final ChannelInitializer<Channel> initializer; public BlockingRpcServer(final Class<?> protocol, final Object instance, @@ -53,78 +56,92 @@ public class BlockingRpcServer extends NettyServerBase { "newReflectiveBlockingService", interfaceClass); this.service = (BlockingService) method.invoke(null, instance); - this.pipeline = new ProtoPipelineFactory(new ServerHandler(), - RpcRequest.getDefaultInstance()); + this.initializer = new ProtoChannelInitializer(new ServerHandler(), RpcRequest.getDefaultInstance()); - super.init(this.pipeline, workerNum); + super.init(this.initializer, workerNum); } - private class ServerHandler extends SimpleChannelUpstreamHandler { + @ChannelHandler.Sharable + private class ServerHandler extends ChannelInboundHandlerAdapter { @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt) - throws Exception { - - accepted.add(evt.getChannel()); + public void channelRegistered(ChannelHandlerContext ctx) throws Exception { + accepted.add(ctx.channel()); if(LOG.isDebugEnabled()){ LOG.debug(String.format(serviceName + " accepted number of connections (%d)", accepted.size())); } - super.channelOpen(ctx, evt); + super.channelRegistered(ctx); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - final RpcRequest request = (RpcRequest) e.getMessage(); + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + accepted.remove(ctx.channel()); + if (LOG.isDebugEnabled()) { + LOG.debug(serviceName + " closes a connection. The number of current connections are " + accepted.size()); + } + super.channelUnregistered(ctx); + } - String methodName = request.getMethodName(); - MethodDescriptor methodDescriptor = - service.getDescriptorForType().findMethodByName(methodName); + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { - if (methodDescriptor == null) { - throw new RemoteCallException(request.getId(), - new NoSuchMethodException(methodName)); - } - Message paramProto = null; - if (request.hasRequestMessage()) { + if (msg instanceof RpcRequest) { try { - paramProto = service.getRequestPrototype(methodDescriptor) - .newBuilderForType().mergeFrom(request.getRequestMessage()). - build(); - - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); + final RpcRequest request = (RpcRequest) msg; + + String methodName = request.getMethodName(); + MethodDescriptor methodDescriptor = service.getDescriptorForType().findMethodByName(methodName); + + if (methodDescriptor == null) { + throw new RemoteCallException(request.getId(), new NoSuchMethodException(methodName)); + } + Message paramProto = null; + if (request.hasRequestMessage()) { + try { + paramProto = service.getRequestPrototype(methodDescriptor).newBuilderForType() + .mergeFrom(request.getRequestMessage()).build(); + + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); + } + } + Message returnValue; + RpcController controller = new NettyRpcController(); + + try { + returnValue = service.callBlockingMethod(methodDescriptor, controller, paramProto); + } catch (Throwable t) { + throw new RemoteCallException(request.getId(), methodDescriptor, t); + } + + RpcResponse.Builder builder = RpcResponse.newBuilder().setId(request.getId()); + + if (returnValue != null) { + builder.setResponseMessage(returnValue.toByteString()); + } + + if (controller.failed()) { + builder.setErrorMessage(controller.errorText()); + } + ctx.writeAndFlush(builder.build()); + } finally { + ReferenceCountUtil.release(msg); } } - Message returnValue; - RpcController controller = new NettyRpcController(); - - try { - returnValue = service.callBlockingMethod(methodDescriptor, - controller, paramProto); - } catch (Throwable t) { - throw new RemoteCallException(request.getId(), methodDescriptor, t); - } - - RpcResponse.Builder builder = - RpcResponse.newBuilder().setId(request.getId()); - - if (returnValue != null) { - builder.setResponseMessage(returnValue.toByteString()); - } - - if (controller.failed()) { - builder.setErrorMessage(controller.errorText()); - } - e.getChannel().write(builder.build()); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - if (e.getCause() instanceof RemoteCallException) { - RemoteCallException callException = (RemoteCallException) e.getCause(); - e.getChannel().write(callException.getResponse()); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (cause instanceof RemoteCallException) { + RemoteCallException callException = (RemoteCallException) cause; + ctx.writeAndFlush(callException.getResponse()); + } + + if (ctx != null && ctx.channel().isActive()) { + ctx.channel().close(); } } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java index fd612a5..c4c3256 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/CallFuture.java @@ -50,14 +50,14 @@ public class CallFuture<T> implements RpcCallback<T>, Future<T> { @Override public boolean cancel(boolean mayInterruptIfRunning) { - // TODO - to be implemented - throw new UnsupportedOperationException(); + controller.startCancel(); + sem.release(); + return controller.isCanceled(); } @Override public boolean isCancelled() { - // TODO - to be implemented - throw new UnsupportedOperationException(); + return controller.isCanceled(); } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java index 1bf0ed8..4ba19a5 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/DefaultRpcController.java @@ -24,9 +24,13 @@ import com.google.protobuf.RpcController; public class DefaultRpcController implements RpcController { private String errorText; private boolean error; + private boolean canceled; @Override public void reset() { + errorText = ""; + error = false; + canceled = false; } @Override @@ -41,6 +45,7 @@ public class DefaultRpcController implements RpcController { @Override public void startCancel() { + this.canceled = true; } @Override @@ -51,7 +56,7 @@ public class DefaultRpcController implements RpcController { @Override public boolean isCanceled() { - return false; + return canceled; } @Override http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java index bc0c567..7b52178 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyClientBase.java @@ -18,12 +18,16 @@ package org.apache.tajo.rpc; +import io.netty.channel.*; + import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.channel.*; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.GenericFutureListener; import java.io.Closeable; import java.net.InetSocketAddress; @@ -37,7 +41,7 @@ public abstract class NettyClientBase implements Closeable { private static final long PAUSE = 1000; // 1 sec private int numRetries; - protected ClientBootstrap bootstrap; + protected Bootstrap bootstrap; private ChannelFuture channelFuture; public NettyClientBase() { @@ -46,55 +50,39 @@ public abstract class NettyClientBase implements Closeable { public abstract <T> T getStub(); public abstract RpcConnectionPool.RpcConnectionKey getKey(); - public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory, + public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer, int numRetries) throws ConnectTimeoutException { this.numRetries = numRetries; - init(addr, pipeFactory, factory); + init(addr, initializer); } - public void init(InetSocketAddress addr, ChannelPipelineFactory pipeFactory, ClientSocketChannelFactory factory) + public void init(InetSocketAddress addr, ChannelInitializer<Channel> initializer) throws ConnectTimeoutException { - this.bootstrap = new ClientBootstrap(factory); - this.bootstrap.setPipelineFactory(pipeFactory); - // TODO - should be configurable - this.bootstrap.setOption("connectTimeoutMillis", 10000); - this.bootstrap.setOption("connectResponseTimeoutMillis", 10000); - this.bootstrap.setOption("receiveBufferSize", 1048576 * 10); - this.bootstrap.setOption("tcpNoDelay", true); - this.bootstrap.setOption("keepAlive", true); + this.bootstrap = new Bootstrap(); + this.bootstrap + .channel(NioSocketChannel.class) + .handler(initializer) + .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) + .option(ChannelOption.SO_RCVBUF, 1048576 * 10) + .option(ChannelOption.TCP_NODELAY, true); connect(addr); } + + private void connectUsingNetty(InetSocketAddress address, GenericFutureListener<ChannelFuture> listener) { + + this.channelFuture = bootstrap.clone().group(RpcChannelFactory.getSharedClientEventloopGroup()) + .connect(address) + .addListener(listener); + } private void handleConnectionInternally(final InetSocketAddress addr) throws ConnectTimeoutException { - this.channelFuture = bootstrap.connect(addr); - final CountDownLatch latch = new CountDownLatch(1); - this.channelFuture.addListener(new ChannelFutureListener() { - private final AtomicInteger retryCount = new AtomicInteger(); - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - if (numRetries > retryCount.getAndIncrement()) { - Thread.sleep(PAUSE); - channelFuture = bootstrap.connect(addr); - channelFuture.addListener(this); - - LOG.debug("Connecting to " + addr + " has been failed. Retrying to connect."); - } - else { - latch.countDown(); - - LOG.error("Max retry count has been exceeded. attempts=" + numRetries); - } - } - else { - latch.countDown(); - } - } - }); + GenericFutureListener<ChannelFuture> listener = new RetryConnectionListener(addr, latch); + connectUsingNetty(addr, listener); try { latch.await(CLIENT_CONNECTION_TIMEOUT_SEC, TimeUnit.SECONDS); @@ -103,7 +91,7 @@ public abstract class NettyClientBase implements Closeable { if (!channelFuture.isSuccess()) { throw new ConnectTimeoutException("Connect error to " + addr + - " caused by " + ExceptionUtils.getMessage(channelFuture.getCause())); + " caused by " + ExceptionUtils.getMessage(channelFuture.cause())); } } @@ -115,34 +103,67 @@ public abstract class NettyClientBase implements Closeable { handleConnectionInternally(addr); } - public boolean isConnected() { - return getChannel().isConnected(); + class RetryConnectionListener implements GenericFutureListener<ChannelFuture> { + private final AtomicInteger retryCount = new AtomicInteger(); + private final InetSocketAddress address; + private final CountDownLatch latch; + + RetryConnectionListener(InetSocketAddress address, CountDownLatch latch) { + this.address = address; + this.latch = latch; + } + + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + if (!channelFuture.isSuccess()) { + channelFuture.channel().close(); + + if (numRetries > retryCount.getAndIncrement()) { + final GenericFutureListener<ChannelFuture> currentListener = this; + + RpcChannelFactory.getSharedClientEventloopGroup().schedule(new Runnable() { + @Override + public void run() { + connectUsingNetty(address, currentListener); + } + }, PAUSE, TimeUnit.MILLISECONDS); + + LOG.debug("Connecting to " + address + " has been failed. Retrying to connect."); + } + else { + latch.countDown(); + + LOG.error("Max retry count has been exceeded. attempts=" + numRetries); + } + } + else { + latch.countDown(); + } + } + } + + public boolean isActive() { + return getChannel().isActive(); } public InetSocketAddress getRemoteAddress() { - if (channelFuture == null || channelFuture.getChannel() == null) { + if (channelFuture == null || channelFuture.channel() == null) { return null; } - return (InetSocketAddress) channelFuture.getChannel().getRemoteAddress(); + return (InetSocketAddress) channelFuture.channel().remoteAddress(); } public Channel getChannel() { - return channelFuture.getChannel(); + return channelFuture.channel(); } @Override public void close() { - if(this.channelFuture != null && getChannel().isOpen()) { - try { - getChannel().close().awaitUninterruptibly(); - } catch (Throwable ce) { - LOG.warn(ce); - } + if (channelFuture != null && getChannel().isActive()) { + getChannel().close(); } - if(this.bootstrap != null) { - // This line will shutdown the factory - // this.bootstrap.releaseExternalResources(); + if (this.bootstrap != null) { InetSocketAddress address = getRemoteAddress(); if (address != null) { LOG.debug("Proxy is disconnected from " + address.getHostName() + ":" + address.getPort()); http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java index ef090ff..1b45ac9 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NettyServerBase.java @@ -20,19 +20,23 @@ package org.apache.tajo.rpc; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.GlobalEventExecutor; import java.io.IOException; import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.util.Random; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class NettyServerBase { @@ -43,10 +47,10 @@ public class NettyServerBase { protected String serviceName; protected InetSocketAddress serverAddr; protected InetSocketAddress bindAddress; - protected ChannelPipelineFactory pipelineFactory; + protected ChannelInitializer<Channel> initializer; protected ServerBootstrap bootstrap; - protected Channel channel; - protected ChannelGroup accepted = new DefaultChannelGroup(); + protected ChannelFuture channelFuture; + protected ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private InetSocketAddress initIsa; @@ -63,19 +67,19 @@ public class NettyServerBase { this.serviceName = name; } - public void init(ChannelPipelineFactory pipeline, int workerNum) { - ChannelFactory factory = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum); - - pipelineFactory = pipeline; - bootstrap = new ServerBootstrap(factory); - bootstrap.setPipelineFactory(pipelineFactory); - // TODO - should be configurable - bootstrap.setOption("reuseAddress", true); - bootstrap.setOption("child.tcpNoDelay", true); - bootstrap.setOption("child.keepAlive", true); - bootstrap.setOption("child.connectTimeoutMillis", 10000); - bootstrap.setOption("child.connectResponseTimeoutMillis", 10000); - bootstrap.setOption("child.receiveBufferSize", 1048576 * 10); + public void init(ChannelInitializer<Channel> initializer, int workerNum) { + bootstrap = RpcChannelFactory.createServerChannelFactory(serviceName, workerNum); + + this.initializer = initializer; + bootstrap + .channel(NioServerSocketChannel.class) + .childHandler(initializer) + .option(ChannelOption.SO_REUSEADDR, true) + .option(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000) + .childOption(ChannelOption.SO_RCVBUF, 1048576 * 10); } public InetSocketAddress getListenAddress() { @@ -98,28 +102,41 @@ public class NettyServerBase { serverAddr = initIsa; } - this.channel = bootstrap.bind(serverAddr); - this.bindAddress = (InetSocketAddress) channel.getLocalAddress(); + this.channelFuture = bootstrap.clone().bind(serverAddr).syncUninterruptibly(); + this.bindAddress = (InetSocketAddress) channelFuture.channel().localAddress(); LOG.info("Rpc (" + serviceName + ") listens on " + this.bindAddress); } public Channel getChannel() { - return this.channel; + return this.channelFuture.channel(); } public void shutdown() { - if(channel != null) { - channel.close().awaitUninterruptibly(); - } + shutdown(false); + } + public void shutdown(boolean waitUntilThreadsStop) { try { - accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS); + accepted.close(); } catch (Throwable t) { LOG.error(t.getMessage(), t); } + if(bootstrap != null) { - bootstrap.releaseExternalResources(); + if (bootstrap.childGroup() != null) { + bootstrap.childGroup().shutdownGracefully(); + if (waitUntilThreadsStop) { + bootstrap.childGroup().terminationFuture().awaitUninterruptibly(); + } + } + + if (bootstrap.group() != null) { + bootstrap.group().shutdownGracefully(); + if (waitUntilThreadsStop) { + bootstrap.childGroup().terminationFuture().awaitUninterruptibly(); + } + } } if (bindAddress != null) { @@ -138,13 +155,14 @@ public class NettyServerBase { // each system has a different starting port number within the given range. private static final AtomicInteger nextPortNum = new AtomicInteger(startPortRange+ rnd.nextInt(endPortRange - startPortRange)); + private static final Object lockObject = new Object(); private synchronized static int getUnusedPort() throws IOException { while (true) { int port = nextPortNum.getAndIncrement(); if (port >= endPortRange) { - synchronized (nextPortNum) { + synchronized (lockObject) { nextPortNum.set(startPortRange); port = nextPortNum.getAndIncrement(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java index 70135a6..9b7f8ac 100644 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/NullCallback.java @@ -20,7 +20,7 @@ package org.apache.tajo.rpc; import com.google.protobuf.RpcCallback; -public class NullCallback implements RpcCallback { +public class NullCallback implements RpcCallback<Object> { private final static NullCallback instance; static { http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java new file mode 100644 index 0000000..6a340dc --- /dev/null +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoChannelInitializer.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufEncoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; + +import com.google.protobuf.MessageLite; + +class ProtoChannelInitializer extends ChannelInitializer<Channel> { + private final MessageLite defaultInstance; + private final ChannelHandler handler; + + public ProtoChannelInitializer(ChannelHandler handler, MessageLite defaultInstance) { + this.handler = handler; + this.defaultInstance = defaultInstance; + } + + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder()); + pipeline.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); + pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); + pipeline.addLast("protobufEncoder", new ProtobufEncoder()); + pipeline.addLast("handler", handler); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/22876a82/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java deleted file mode 100644 index 7aa03e7..0000000 --- a/tajo-rpc/src/main/java/org/apache/tajo/rpc/ProtoPipelineFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.rpc; - -import com.google.protobuf.MessageLite; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.ChannelUpstreamHandler; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder; -import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder; -import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; - -public class ProtoPipelineFactory implements ChannelPipelineFactory { - private final ChannelUpstreamHandler handler; - private final MessageLite defaultInstance; - - public ProtoPipelineFactory(ChannelUpstreamHandler handlerFactory, - MessageLite defaultInstance) { - this.handler = handlerFactory; - this.defaultInstance = defaultInstance; - } - - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline p = Channels.pipeline(); - p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder()); - p.addLast("protobufDecoder", new ProtobufDecoder(defaultInstance)); - p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender()); - p.addLast("protobufEncoder", new ProtobufEncoder()); - p.addLast("handler", handler); - return p; - } -}
