Repository: tajo Updated Branches: refs/heads/branch-0.10.1 e75b928fe -> 75e985eac
TAJO-1564: TestFetcher fails occasionally. (jinho) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/75e985ea Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/75e985ea Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/75e985ea Branch: refs/heads/branch-0.10.1 Commit: 75e985eaca8146020b2b9d79e609ba1ed041998f Parents: e75b928 Author: Jinho Kim <[email protected]> Authored: Sat Apr 18 14:53:05 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Sat Apr 18 14:53:05 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/worker/Fetcher.java | 47 +++++++------------- 2 files changed, 18 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/75e985ea/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 5b1b2b6..995305e 100644 --- a/CHANGES +++ b/CHANGES @@ -27,6 +27,8 @@ Release 0.10.1 - unreleased BUG FIXES + TAJO-1564: TestFetcher fails occasionally. (jinho) + TAJO-1538: TajoWorkerResourceManager.allocatedResourceMap is increasing forever. (Contributed by navis. Committed by jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/75e985ea/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index 31599a3..94488d0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -18,8 +18,15 @@ package org.apache.tajo.worker; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.*; +import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.ReferenceCountUtil; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IOUtils; @@ -28,24 +35,6 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.RpcChannelFactory; -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.http.DefaultHttpRequest; -import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpContentDecompressor; -import io.netty.handler.codec.http.HttpHeaders; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.timeout.ReadTimeoutException; -import io.netty.handler.timeout.ReadTimeoutHandler; -import io.netty.util.ReferenceCountUtil; - import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; @@ -72,7 +61,7 @@ public class Fetcher { private final boolean useLocalFile; private long startTime; - private long finishTime; + private volatile long finishTime; private long fileLen; private int messageReceiveCount; private TajoProtos.FetcherState state; @@ -167,19 +156,17 @@ public class Fetcher { LOG.info("Status: " + getState() + ", URI:" + uri); // Send the HTTP request. - ChannelFuture channelFuture = channel.writeAndFlush(request); - - // Wait for the server to close the connection. - channel.closeFuture().awaitUninterruptibly(); + channel.writeAndFlush(request); - channelFuture.addListener(ChannelFutureListener.CLOSE); + // Wait for the server to close the connection. throw exception if failed + channel.closeFuture().syncUninterruptibly(); fileChunk.setLength(fileChunk.getFile().length()); return fileChunk; } finally { - if(future != null){ + if(future != null && future.channel().isOpen()){ // Close the channel to exit. - future.channel().close(); + future.channel().close().awaitUninterruptibly(); } this.finishTime = System.currentTimeMillis(); @@ -262,14 +249,12 @@ public class Fetcher { fileLen = file.length(); } - IOUtils.cleanup(LOG, fc, raf); - if (ctx.channel().isActive()) { - ctx.channel().close(); - } finishTime = System.currentTimeMillis(); if (state != TajoProtos.FetcherState.FETCH_FAILED) { state = TajoProtos.FetcherState.FETCH_FINISHED; } + + IOUtils.cleanup(LOG, fc, raf); } } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -283,7 +268,7 @@ public class Fetcher { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof ReadTimeoutException) { - LOG.warn(cause, cause); + LOG.warn(cause.getMessage(), cause); } else { LOG.error("Fetch failed :", cause); }
