Repository: tajo
Updated Branches:
  refs/heads/master 9f873b129 -> 01857dadd


TAJO-908: Fetcher does not retry, when pull server connection was closed. 
(jinho)
Closes #58


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/01857dad
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/01857dad
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/01857dad

Branch: refs/heads/master
Commit: 01857dadd42bd55894ce6edb55159e80c30f6c30
Parents: 9f873b1
Author: jinossy <[email protected]>
Authored: Fri Jul 11 22:07:43 2014 +0900
Committer: jinossy <[email protected]>
Committed: Fri Jul 11 22:07:43 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../java/org/apache/tajo/conf/TajoConf.java     |   3 +
 tajo-common/src/main/proto/tajo_protos.proto    |   1 +
 .../java/org/apache/tajo/worker/Fetcher.java    | 119 ++++++++++----
 .../main/java/org/apache/tajo/worker/Task.java  |  24 +--
 .../org/apache/tajo/worker/TestFetcher.java     | 162 ++++++++++++++-----
 .../tajo/pullserver/TajoPullServerService.java  |  26 ++-
 7 files changed, 244 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2feba78..87cf7b0 100644
--- a/CHANGES
+++ b/CHANGES
@@ -81,6 +81,9 @@ Release 0.9.0 - unreleased
     (Hyoungjun Kim via hyunsik)
 
   BUG FIXES
+
+    TAJO-908: Fetcher does not retry, when pull server connection was closed.
+    (jinho)
  
     TAJO-926: Join condition including column references of a row-preserving
     table in left outer join causes incorrect result. (hyunsik)

http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java 
b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 6298d27..dd5327d 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -217,6 +217,9 @@ public class TajoConf extends Configuration {
     SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false),
     SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"),
     
SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num",
 2),
+    SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size",  
8192 * 8),
+    SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 5),
+    
SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 
5),
 
     //////////////////////////////////
     // Storage Configuration

http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-common/src/main/proto/tajo_protos.proto
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/proto/tajo_protos.proto 
b/tajo-common/src/main/proto/tajo_protos.proto
index a7aa4f7..edd27fc 100644
--- a/tajo-common/src/main/proto/tajo_protos.proto
+++ b/tajo-common/src/main/proto/tajo_protos.proto
@@ -51,4 +51,5 @@ enum FetcherState {
   FETCH_INIT = 0;
   FETCH_FETCHING = 1;
   FETCH_FINISHED = 2;
+  FETCH_FAILED = 3;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
index 37c653c..2aa2875 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java
@@ -22,11 +22,16 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.TajoProtos;
+import org.apache.tajo.conf.TajoConf;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.*;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.handler.codec.http.*;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timer;
 
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -48,6 +53,7 @@ public class Fetcher {
 
   private final URI uri;
   private final File file;
+  private final TajoConf conf;
 
   private final String host;
   private int port;
@@ -57,13 +63,15 @@ public class Fetcher {
   private long fileLen;
   private int messageReceiveCount;
   private TajoProtos.FetcherState state;
+  private Timer timer;
 
   private ClientBootstrap bootstrap;
 
-  public Fetcher(URI uri, File file, ClientSocketChannelFactory factory) {
+  public Fetcher(TajoConf conf, URI uri, File file, ClientSocketChannelFactory 
factory) {
     this.uri = uri;
     this.file = file;
     this.state = TajoProtos.FetcherState.FETCH_INIT;
+    this.conf = conf;
 
     String scheme = uri.getScheme() == null ? "http" : uri.getScheme();
     this.host = uri.getHost() == null ? "localhost" : uri.getHost();
@@ -106,41 +114,47 @@ public class Fetcher {
   }
 
   public File get() throws IOException {
-    startTime = System.currentTimeMillis();
+    this.startTime = System.currentTimeMillis();
     this.state = TajoProtos.FetcherState.FETCH_FETCHING;
 
-    ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, 
port));
+    try {
+      ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, 
port));
 
-    // Wait until the connection attempt succeeds or fails.
-    Channel channel = future.awaitUninterruptibly().getChannel();
-    if (!future.isSuccess()) {
-      future.getChannel().close();
-      throw new IOException(future.getCause());
-    }
-
-    String query = uri.getPath()
-        + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
-    // Prepare the HTTP request.
-    HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, 
HttpMethod.GET, query);
-    request.setHeader(HttpHeaders.Names.HOST, host);
-    LOG.info("Fetch: " + uri);
-    request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
-    request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, 
HttpHeaders.Values.GZIP);
+      // Wait until the connection attempt succeeds or fails.
+      Channel channel = future.awaitUninterruptibly().getChannel();
+      if (!future.isSuccess()) {
+        future.getChannel().close();
+        state = TajoProtos.FetcherState.FETCH_FAILED;
+        throw new IOException(future.getCause());
+      }
 
+      String query = uri.getPath()
+          + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : "");
+      // Prepare the HTTP request.
+      HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, 
HttpMethod.GET, query);
+      request.setHeader(HttpHeaders.Names.HOST, host);
+      request.setHeader(HttpHeaders.Names.CONNECTION, 
HttpHeaders.Values.CLOSE);
+      request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, 
HttpHeaders.Values.GZIP);
 
-    // Send the HTTP request.
-    ChannelFuture channelFuture = channel.write(request);
+      LOG.info("Status: " + getState() + ", URI:" + uri);
+      // Send the HTTP request.
+      ChannelFuture channelFuture = channel.write(request);
 
-    // Wait for the server to close the connection.
-    channel.getCloseFuture().awaitUninterruptibly();
+      // Wait for the server to close the connection.
+      channel.getCloseFuture().awaitUninterruptibly();
 
-    channelFuture.addListener(ChannelFutureListener.CLOSE);
+      channelFuture.addListener(ChannelFutureListener.CLOSE);
 
-    // Close the channel to exit.
-    future.getChannel().close();
-    finishTime = System.currentTimeMillis();
-    this.state = TajoProtos.FetcherState.FETCH_FINISHED;
-    return file;
+      // Close the channel to exit.
+      future.getChannel().close();
+      return file;
+    } finally {
+      this.finishTime = System.currentTimeMillis();
+      LOG.info("Status: " + getState() + ", URI:" + uri);
+      if (timer != null) {
+        timer.stop();
+      }
+    }
   }
 
   public URI getURI() {
@@ -161,9 +175,11 @@ public class Fetcher {
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
         throws Exception {
+
       messageReceiveCount++;
       try {
-        if (!readingChunks) {
+        if (!readingChunks && e.getMessage() instanceof HttpResponse) {
+
           HttpResponse response = (HttpResponse) e.getMessage();
 
           StringBuilder sb = new StringBuilder();
@@ -188,8 +204,13 @@ public class Fetcher {
             LOG.debug(sb.toString());
           }
 
-          if (response.getStatus() == HttpResponseStatus.NO_CONTENT) {
-            LOG.info("There are no data corresponding to the request");
+          if (response.getStatus().getCode() == 
HttpResponseStatus.NO_CONTENT.getCode()) {
+            LOG.warn("There are no data corresponding to the request");
+            length = 0;
+            return;
+          } else if (response.getStatus().getCode() != 
HttpResponseStatus.OK.getCode()){
+            LOG.error(response.getStatus().getReasonPhrase());
+            state = TajoProtos.FetcherState.FETCH_FAILED;
             return;
           }
 
@@ -217,7 +238,9 @@ public class Fetcher {
                   + "(received/total: " + fileLength + "/" + length + ")");
             }
           } else {
-            fc.write(chunk.getContent().toByteBuffer());
+            if(fc != null){
+              fc.write(chunk.getContent().toByteBuffer());
+            }
           }
         }
       } finally {
@@ -225,12 +248,32 @@ public class Fetcher {
           fileLen = file.length();
         }
 
-        if(fileLen >= length){
+        if(fileLen == length){
           IOUtils.cleanup(LOG, fc, raf);
-
+          finishTime = System.currentTimeMillis();
+          state = TajoProtos.FetcherState.FETCH_FINISHED;
         }
       }
     }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+        throws Exception {
+      if (e.getCause() instanceof ReadTimeoutException) {
+        LOG.warn(e.getCause());
+      } else {
+        LOG.error("Fetch failed :", e.getCause());
+      }
+
+      if(ctx.getChannel().isConnected()){
+        ctx.getChannel().close().setFailure(e.getCause());
+      }
+
+      // this fetching will be retry
+      IOUtils.cleanup(LOG, fc, raf);
+      finishTime = System.currentTimeMillis();
+      state = TajoProtos.FetcherState.FETCH_FAILED;
+    }
   }
 
   class HttpClientPipelineFactory implements
@@ -245,8 +288,14 @@ public class Fetcher {
     public ChannelPipeline getPipeline() throws Exception {
       ChannelPipeline pipeline = pipeline();
 
-      pipeline.addLast("codec", new HttpClientCodec());
+      timer = new HashedWheelTimer();
+
+      int maxChunkSize = 
conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE);
+      int readTimeout = 
conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT);
+
+      pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize));
       pipeline.addLast("inflater", new HttpContentDecompressor());
+      pipeline.addLast("timeout", new ReadTimeoutHandler(timer, readTimeout));
       pipeline.addLast("handler", new HttpClientHandler(file));
       return pipeline;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java 
b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
index a960f69..9350838 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java
@@ -576,23 +576,25 @@ public class Task {
   private class FetchRunner implements Runnable {
     private final TaskAttemptContext ctx;
     private final Fetcher fetcher;
+    private int maxRetryNum;
 
     public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) {
       this.ctx = ctx;
       this.fetcher = fetcher;
+      this.maxRetryNum = 
systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM);
     }
 
     @Override
     public void run() {
       int retryNum = 0;
-      int maxRetryNum = 5;
-      int retryWaitTime = 1000;
+      int retryWaitTime = 1000; //sec
 
       try { // for releasing fetch latch
         while(!killed && retryNum < maxRetryNum) {
           if (retryNum > 0) {
             try {
               Thread.sleep(retryWaitTime);
+              retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2);  // max 
10 seconds
             } catch (InterruptedException e) {
               LOG.error(e);
             }
@@ -600,7 +602,7 @@ public class Task {
           }
           try {
             File fetched = fetcher.get();
-            if (fetched != null) {
+            if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED 
&& fetched != null) {
               break;
             }
           } catch (IOException e) {
@@ -609,11 +611,15 @@ public class Task {
           retryNum++;
         }
       } finally {
-        fetcherFinished(ctx);
-      }
-
-      if (retryNum == maxRetryNum) {
-        LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch 
exceeded (" + fetcher.getURI() + ")");
+        if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){
+          fetcherFinished(ctx);
+        } else {
+          if (retryNum == maxRetryNum) {
+            LOG.error("ERROR: the maximum retry (" + retryNum + ") on the 
fetch exceeded (" + fetcher.getURI() + ")");
+          }
+          aborted = true; // retry queryUnit
+          ctx.getFetchLatch().countDown();
+        }
       }
     }
   }
@@ -674,7 +680,7 @@ public class Task {
             storeDir.mkdirs();
           }
           storeFile = new File(storeDir, "in_" + i);
-          Fetcher fetcher = new Fetcher(uri, storeFile, channelFactory);
+          Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, 
channelFactory);
           runnerList.add(fetcher);
           i++;
         }

http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java 
b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
index c933294..82d662b 100644
--- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
+++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java
@@ -18,70 +18,87 @@
 
 package org.apache.tajo.worker;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.fs.*;
+import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.pullserver.TajoPullServerService;
 import org.apache.tajo.rpc.RpcChannelFactory;
 import org.apache.tajo.util.CommonTestingUtil;
-import org.apache.tajo.worker.dataserver.HttpDataServer;
-import org.apache.tajo.worker.dataserver.retriever.DataRetriever;
-import org.apache.tajo.worker.dataserver.retriever.DirectoryRetriever;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 public class TestFetcher {
   private String TEST_DATA = "target/test-data/TestFetcher";
   private String INPUT_DIR = TEST_DATA+"/in/";
   private String OUTPUT_DIR = TEST_DATA+"/out/";
+  private TajoConf conf = new TajoConf();
+  private TajoPullServerService pullServerService;
+  private ClientSocketChannelFactory channelFactory;
 
   @Before
   public void setUp() throws Exception {
     CommonTestingUtil.getTestDir(TEST_DATA);
     CommonTestingUtil.getTestDir(INPUT_DIR);
     CommonTestingUtil.getTestDir(OUTPUT_DIR);
+    conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1);
+    conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127);
+
+    pullServerService = new TajoPullServerService();
+    pullServerService.init(conf);
+    pullServerService.start();
+
+    channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 
1);
+  }
+
+  @After
+  public void tearDown(){
+    pullServerService.stop();
+    channelFactory.releaseExternalResources();
   }
 
   @Test
   public void testGet() throws IOException {
     Random rnd = new Random();
-    FileWriter writer = new FileWriter(INPUT_DIR + "data");
-    String data;
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + 
"/" +ta + "/output/" + partId;
+    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, 
sid, partId, "h", ta);
+
+    Path inputPath = new Path(dataPath);
+    FSDataOutputStream stream =  LocalFileSystem.get(conf).create(inputPath, 
true);
     for (int i = 0; i < 100; i++) {
-      data = ""+rnd.nextInt();
-      writer.write(data);
+      String data = ""+rnd.nextInt();
+      stream.write(data.getBytes());
     }
-    writer.flush();
-    writer.close();
-
-    DataRetriever ret = new DirectoryRetriever(INPUT_DIR);
-    HttpDataServer server = new HttpDataServer(
-        NetUtils.createSocketAddr("127.0.0.1:0"), ret);
-    server.start();
-    InetSocketAddress addr = server.getBindAddress();
-    
-    URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data");
-    ClientSocketChannelFactory channelFactory = 
RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
-    Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), 
channelFactory);
-    fetcher.get();
-    server.stop();
-    
+    stream.flush();
+    stream.close();
+
+    URI uri = URI.create("http://127.0.0.1:"; + pullServerService.getPort() + 
"/?" + params);
+    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + 
"data"), channelFactory);
+    assertNotNull(fetcher.get());
+
     FileSystem fs = FileSystem.getLocal(new TajoConf());
-    FileStatus inStatus = fs.getFileStatus(new Path(INPUT_DIR, "data"));
+    FileStatus inStatus = fs.getFileStatus(inputPath);
     FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data"));
+
     assertEquals(inStatus.getLen(), outStatus.getLen());
+    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
   }
 
   @Test
@@ -96,29 +113,86 @@ public class TestFetcher {
   @Test
   public void testStatus() throws Exception {
     Random rnd = new Random();
-    FileWriter writer = new FileWriter(INPUT_DIR + "data");
-    String data;
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + 
"/" +ta + "/output/" + partId;
+    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, 
sid, partId, "h", ta);
+
+    FSDataOutputStream stream =  LocalFileSystem.get(conf).create(new 
Path(dataPath), true);
     for (int i = 0; i < 100; i++) {
-      data = ""+rnd.nextInt();
-      writer.write(data);
+      String data = ""+rnd.nextInt();
+      stream.write(data.getBytes());
     }
-    writer.flush();
-    writer.close();
+    stream.flush();
+    stream.close();
+
+    URI uri = URI.create("http://127.0.0.1:"; + pullServerService.getPort() + 
"/?" + params);
+    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + 
"data"), channelFactory);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
-    DataRetriever ret = new DirectoryRetriever(INPUT_DIR);
-    final HttpDataServer server = new HttpDataServer(
-        NetUtils.createSocketAddr("127.0.0.1:0"), ret);
-    server.start();
-    InetSocketAddress addr = server.getBindAddress();
+    fetcher.get();
+    assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
+  }
+
+  @Test
+  public void testNoContentFetch() throws Exception {
 
-    URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data");
-    ClientSocketChannelFactory channelFactory = 
RpcChannelFactory.createClientChannelFactory("Fetcher", 1);
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
 
-    final Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), 
channelFactory);
+    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + 
"/" +ta + "/output/" + partId;
+    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, 
sid, partId, "h", ta);
+
+    Path inputPath = new Path(dataPath);
+    if(LocalFileSystem.get(conf).exists(inputPath)){
+      LocalFileSystem.get(conf).delete(new Path(dataPath), true);
+    }
+
+    FSDataOutputStream stream =  LocalFileSystem.get(conf).create(new 
Path(dataPath).getParent(), true);
+    stream.close();
+
+    URI uri = URI.create("http://127.0.0.1:"; + pullServerService.getPort() + 
"/?" + params);
+    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + 
"data"), channelFactory);
     assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
 
     fetcher.get();
     assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState());
-    server.stop();
+  }
+
+  @Test
+  public void testFailureStatus() throws Exception {
+    Random rnd = new Random();
+
+    QueryId queryId = QueryIdFactory.NULL_QUERY_ID;
+    String sid = "1";
+    String ta = "1_0";
+    String partId = "1";
+
+    String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + 
"/" +ta + "/output/" + partId;
+
+    //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type
+    String shuffleType = "x";
+    String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, 
sid, partId, shuffleType, ta);
+
+    FSDataOutputStream stream =  LocalFileSystem.get(conf).create(new 
Path(dataPath), true);
+
+    for (int i = 0; i < 100; i++) {
+      String data = params + rnd.nextInt();
+      stream.write(data.getBytes());
+    }
+    stream.flush();
+    stream.close();
+
+    URI uri = URI.create("http://127.0.0.1:"; + pullServerService.getPort() + 
"/?" + params);
+    final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + 
"data"), channelFactory);
+    assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState());
+
+    fetcher.get();
+    assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState());
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
----------------------------------------------------------------------
diff --git 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
index cc3cb2e..373642b 100644
--- 
a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
+++ 
b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java
@@ -299,9 +299,11 @@ public class TajoPullServerService extends AbstractService 
{
       if (sslFactory != null) {
         pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
       }
-      pipeline.addLast("decoder", new HttpRequestDecoder());
+
+      int maxChunkSize = 
getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname,
+          ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal);
+      pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize));
       pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
-      pipeline.addLast("encoder", new HttpResponseEncoder());
       pipeline.addLast("chunking", new ChunkedWriteHandler());
       pipeline.addLast("shuffle", PullServer);
       return pipeline;
@@ -319,11 +321,14 @@ public class TajoPullServerService extends 
AbstractService {
       new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname);
     private int port;
 
-    public PullServer(Configuration conf) {
+    public PullServer(Configuration conf) throws IOException {
       this.conf = conf;
 //      indexCache = new IndexCache(new JobConf(conf));
       this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname,
           ConfVars.PULLSERVER_PORT.defaultIntVal);
+
+      // init local temporal dir
+      lDirAlloc.getAllLocalPathsToRead(".", conf);
     }
     
     public void setPort(int port) {
@@ -402,9 +407,13 @@ public class TajoPullServerService extends AbstractService 
{
       // if a subquery requires a range shuffle
       if (shuffleType.equals("r")) {
         String ta = taskIds.get(0);
+        if(!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + 
"/output/", conf)){
+          LOG.warn(e);
+          sendError(ctx, NO_CONTENT);
+          return;
+        }
         Path path = localFS.makeQualified(
             lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + 
"/output/", conf));
-
         String startKey = params.get("start").get(0);
         String endKey = params.get("end").get(0);
         boolean last = params.get("final") != null;
@@ -424,15 +433,20 @@ public class TajoPullServerService extends 
AbstractService {
         // if a subquery requires a hash shuffle
       } else if (shuffleType.equals("h")) {
         for (String ta : taskIds) {
+          if (!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + 
"/output/" + partId, conf)) {
+            LOG.warn(e);
+            sendError(ctx, NO_CONTENT);
+            return;
+          }
           Path path = localFS.makeQualified(
-              lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" +
-                  ta + "/output/" + partId, conf));
+              lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta 
+ "/output/" + partId, conf));
           File file = new File(path.toUri());
           FileChunk chunk = new FileChunk(file, 0, file.length());
           chunks.add(chunk);
         }
       } else {
         LOG.error("Unknown shuffle type: " + shuffleType);
+        sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST);
         return;
       }
 

Reply via email to