Repository: tajo
Updated Branches:
  refs/heads/branch-0.10.1 e75b928fe -> 75e985eac


TAJO-1564: TestFetcher fails occasionally. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/75e985ea
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/75e985ea
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/75e985ea

Branch: refs/heads/branch-0.10.1
Commit: 75e985eaca8146020b2b9d79e609ba1ed041998f
Parents: e75b928
Author: Jinho Kim <[email protected]>
Authored: Sat Apr 18 14:53:05 2015 +0900
Committer: Jinho Kim <[email protected]>
Committed: Sat Apr 18 14:53:05 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../java/org/apache/tajo/worker/Fetcher.java    | 47 +++++++-------------
 2 files changed, 18 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/75e985ea/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 5b1b2b6..995305e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -27,6 +27,8 @@ Release 0.10.1 - unreleased
 
   BUG FIXES
 
+    TAJO-1564: TestFetcher fails occasionally. (jinho)
+
     TAJO-1538: TajoWorkerResourceManager.allocatedResourceMap is increasing 
     forever. (Contributed by navis. Committed by jinho)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/75e985ea/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 31599a3..94488d0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -18,8 +18,15 @@
 
 package org.apache.tajo.worker;
 
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.*;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.ReferenceCountUtil;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
@@ -28,24 +35,6 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.pullserver.retriever.FileChunk;
 import org.apache.tajo.rpc.RpcChannelFactory;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.http.DefaultHttpRequest;
-import io.netty.handler.codec.http.HttpClientCodec;
-import io.netty.handler.codec.http.HttpContent;
-import io.netty.handler.codec.http.HttpContentDecompressor;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpVersion;
-import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.handler.timeout.ReadTimeoutException;
-import io.netty.handler.timeout.ReadTimeoutHandler;
-import io.netty.util.ReferenceCountUtil;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -72,7 +61,7 @@ public class Fetcher {
   private final boolean useLocalFile;
 
   private long startTime;
-  private long finishTime;
+  private volatile long finishTime;
   private long fileLen;
   private int messageReceiveCount;
   private TajoProtos.FetcherState state;
@@ -167,19 +156,17 @@ public class Fetcher {
 
       LOG.info("Status: " + getState() + ", URI:" + uri);
       // Send the HTTP request.
-      ChannelFuture channelFuture = channel.writeAndFlush(request);
-
-      // Wait for the server to close the connection.
-      channel.closeFuture().awaitUninterruptibly();
+      channel.writeAndFlush(request);
 
-      channelFuture.addListener(ChannelFutureListener.CLOSE);
+      // Wait for the server to close the connection. throw exception if failed
+      channel.closeFuture().syncUninterruptibly();
 
       fileChunk.setLength(fileChunk.getFile().length());
       return fileChunk;
     } finally {
-      if(future != null){
+      if(future != null && future.channel().isOpen()){
         // Close the channel to exit.
-        future.channel().close();
+        future.channel().close().awaitUninterruptibly();
       }
 
       this.finishTime = System.currentTimeMillis();
@@ -262,14 +249,12 @@ public class Fetcher {
               fileLen = file.length();
             }
 
-            IOUtils.cleanup(LOG, fc, raf);
-            if (ctx.channel().isActive()) {
-              ctx.channel().close();
-            }
             finishTime = System.currentTimeMillis();
             if (state != TajoProtos.FetcherState.FETCH_FAILED) {
               state = TajoProtos.FetcherState.FETCH_FINISHED;
             }
+
+            IOUtils.cleanup(LOG, fc, raf);
           }
         } catch (Exception e) {
           LOG.error(e.getMessage(), e);
@@ -283,7 +268,7 @@ public class Fetcher {
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
         throws Exception {
       if (cause instanceof ReadTimeoutException) {
-        LOG.warn(cause, cause);
+        LOG.warn(cause.getMessage(), cause);
       } else {
         LOG.error("Fetch failed :", cause);
       }

Reply via email to