Repository: tajo Updated Branches: refs/heads/master 9f873b129 -> 01857dadd
TAJO-908: Fetcher does not retry, when pull server connection was closed. (jinho) Closes #58 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/01857dad Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/01857dad Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/01857dad Branch: refs/heads/master Commit: 01857dadd42bd55894ce6edb55159e80c30f6c30 Parents: 9f873b1 Author: jinossy <[email protected]> Authored: Fri Jul 11 22:07:43 2014 +0900 Committer: jinossy <[email protected]> Committed: Fri Jul 11 22:07:43 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../java/org/apache/tajo/conf/TajoConf.java | 3 + tajo-common/src/main/proto/tajo_protos.proto | 1 + .../java/org/apache/tajo/worker/Fetcher.java | 119 ++++++++++---- .../main/java/org/apache/tajo/worker/Task.java | 24 +-- .../org/apache/tajo/worker/TestFetcher.java | 162 ++++++++++++++----- .../tajo/pullserver/TajoPullServerService.java | 26 ++- 7 files changed, 244 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 2feba78..87cf7b0 100644 --- a/CHANGES +++ b/CHANGES @@ -81,6 +81,9 @@ Release 0.9.0 - unreleased (Hyoungjun Kim via hyunsik) BUG FIXES + + TAJO-908: Fetcher does not retry, when pull server connection was closed. + (jinho) TAJO-926: Join condition including column references of a row-preserving table in left outer join causes incorrect result. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 6298d27..dd5327d 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -217,6 +217,9 @@ public class TajoConf extends Configuration { SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false), SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"), SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", 2), + SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192 * 8), + SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 5), + SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 5), ////////////////////////////////// // Storage Configuration http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-common/src/main/proto/tajo_protos.proto ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto index a7aa4f7..edd27fc 100644 --- a/tajo-common/src/main/proto/tajo_protos.proto +++ b/tajo-common/src/main/proto/tajo_protos.proto @@ -51,4 +51,5 @@ enum FetcherState { FETCH_INIT = 0; FETCH_FETCHING = 1; FETCH_FINISHED = 2; + FETCH_FAILED = 3; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index 37c653c..2aa2875 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -22,11 +22,16 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.TajoProtos; +import org.apache.tajo.conf.TajoConf; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.*; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.jboss.netty.handler.codec.http.*; +import org.jboss.netty.handler.timeout.ReadTimeoutException; +import org.jboss.netty.handler.timeout.ReadTimeoutHandler; +import org.jboss.netty.util.HashedWheelTimer; +import org.jboss.netty.util.Timer; import java.io.File; import java.io.FileNotFoundException; @@ -48,6 +53,7 @@ public class Fetcher { private final URI uri; private final File file; + private final TajoConf conf; private final String host; private int port; @@ -57,13 +63,15 @@ public class Fetcher { private long fileLen; private int messageReceiveCount; private TajoProtos.FetcherState state; + private Timer timer; private ClientBootstrap bootstrap; - public Fetcher(URI uri, File file, ClientSocketChannelFactory factory) { + public Fetcher(TajoConf conf, URI uri, File file, ClientSocketChannelFactory factory) { this.uri = uri; this.file = file; this.state = TajoProtos.FetcherState.FETCH_INIT; + this.conf = conf; String scheme = uri.getScheme() == null ? "http" : uri.getScheme(); this.host = uri.getHost() == null ? "localhost" : uri.getHost(); @@ -106,41 +114,47 @@ public class Fetcher { } public File get() throws IOException { - startTime = System.currentTimeMillis(); + this.startTime = System.currentTimeMillis(); this.state = TajoProtos.FetcherState.FETCH_FETCHING; - ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); + try { + ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); - // Wait until the connection attempt succeeds or fails. - Channel channel = future.awaitUninterruptibly().getChannel(); - if (!future.isSuccess()) { - future.getChannel().close(); - throw new IOException(future.getCause()); - } - - String query = uri.getPath() - + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : ""); - // Prepare the HTTP request. - HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query); - request.setHeader(HttpHeaders.Names.HOST, host); - LOG.info("Fetch: " + uri); - request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); - request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); + // Wait until the connection attempt succeeds or fails. + Channel channel = future.awaitUninterruptibly().getChannel(); + if (!future.isSuccess()) { + future.getChannel().close(); + state = TajoProtos.FetcherState.FETCH_FAILED; + throw new IOException(future.getCause()); + } + String query = uri.getPath() + + (uri.getRawQuery() != null ? "?" + uri.getRawQuery() : ""); + // Prepare the HTTP request. + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, query); + request.setHeader(HttpHeaders.Names.HOST, host); + request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); + request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); - // Send the HTTP request. - ChannelFuture channelFuture = channel.write(request); + LOG.info("Status: " + getState() + ", URI:" + uri); + // Send the HTTP request. + ChannelFuture channelFuture = channel.write(request); - // Wait for the server to close the connection. - channel.getCloseFuture().awaitUninterruptibly(); + // Wait for the server to close the connection. + channel.getCloseFuture().awaitUninterruptibly(); - channelFuture.addListener(ChannelFutureListener.CLOSE); + channelFuture.addListener(ChannelFutureListener.CLOSE); - // Close the channel to exit. - future.getChannel().close(); - finishTime = System.currentTimeMillis(); - this.state = TajoProtos.FetcherState.FETCH_FINISHED; - return file; + // Close the channel to exit. + future.getChannel().close(); + return file; + } finally { + this.finishTime = System.currentTimeMillis(); + LOG.info("Status: " + getState() + ", URI:" + uri); + if (timer != null) { + timer.stop(); + } + } } public URI getURI() { @@ -161,9 +175,11 @@ public class Fetcher { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + messageReceiveCount++; try { - if (!readingChunks) { + if (!readingChunks && e.getMessage() instanceof HttpResponse) { + HttpResponse response = (HttpResponse) e.getMessage(); StringBuilder sb = new StringBuilder(); @@ -188,8 +204,13 @@ public class Fetcher { LOG.debug(sb.toString()); } - if (response.getStatus() == HttpResponseStatus.NO_CONTENT) { - LOG.info("There are no data corresponding to the request"); + if (response.getStatus().getCode() == HttpResponseStatus.NO_CONTENT.getCode()) { + LOG.warn("There are no data corresponding to the request"); + length = 0; + return; + } else if (response.getStatus().getCode() != HttpResponseStatus.OK.getCode()){ + LOG.error(response.getStatus().getReasonPhrase()); + state = TajoProtos.FetcherState.FETCH_FAILED; return; } @@ -217,7 +238,9 @@ public class Fetcher { + "(received/total: " + fileLength + "/" + length + ")"); } } else { - fc.write(chunk.getContent().toByteBuffer()); + if(fc != null){ + fc.write(chunk.getContent().toByteBuffer()); + } } } } finally { @@ -225,12 +248,32 @@ public class Fetcher { fileLen = file.length(); } - if(fileLen >= length){ + if(fileLen == length){ IOUtils.cleanup(LOG, fc, raf); - + finishTime = System.currentTimeMillis(); + state = TajoProtos.FetcherState.FETCH_FINISHED; } } } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) + throws Exception { + if (e.getCause() instanceof ReadTimeoutException) { + LOG.warn(e.getCause()); + } else { + LOG.error("Fetch failed :", e.getCause()); + } + + if(ctx.getChannel().isConnected()){ + ctx.getChannel().close().setFailure(e.getCause()); + } + + // this fetching will be retry + IOUtils.cleanup(LOG, fc, raf); + finishTime = System.currentTimeMillis(); + state = TajoProtos.FetcherState.FETCH_FAILED; + } } class HttpClientPipelineFactory implements @@ -245,8 +288,14 @@ public class Fetcher { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); - pipeline.addLast("codec", new HttpClientCodec()); + timer = new HashedWheelTimer(); + + int maxChunkSize = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE); + int readTimeout = conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT); + + pipeline.addLast("codec", new HttpClientCodec(4096, 8192, maxChunkSize)); pipeline.addLast("inflater", new HttpContentDecompressor()); + pipeline.addLast("timeout", new ReadTimeoutHandler(timer, readTimeout)); pipeline.addLast("handler", new HttpClientHandler(file)); return pipeline; } http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index a960f69..9350838 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -576,23 +576,25 @@ public class Task { private class FetchRunner implements Runnable { private final TaskAttemptContext ctx; private final Fetcher fetcher; + private int maxRetryNum; public FetchRunner(TaskAttemptContext ctx, Fetcher fetcher) { this.ctx = ctx; this.fetcher = fetcher; + this.maxRetryNum = systemConf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_RETRY_MAX_NUM); } @Override public void run() { int retryNum = 0; - int maxRetryNum = 5; - int retryWaitTime = 1000; + int retryWaitTime = 1000; //sec try { // for releasing fetch latch while(!killed && retryNum < maxRetryNum) { if (retryNum > 0) { try { Thread.sleep(retryWaitTime); + retryWaitTime = Math.min(10 * 1000, retryWaitTime * 2); // max 10 seconds } catch (InterruptedException e) { LOG.error(e); } @@ -600,7 +602,7 @@ public class Task { } try { File fetched = fetcher.get(); - if (fetched != null) { + if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null) { break; } } catch (IOException e) { @@ -609,11 +611,15 @@ public class Task { retryNum++; } } finally { - fetcherFinished(ctx); - } - - if (retryNum == maxRetryNum) { - LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); + if(fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED){ + fetcherFinished(ctx); + } else { + if (retryNum == maxRetryNum) { + LOG.error("ERROR: the maximum retry (" + retryNum + ") on the fetch exceeded (" + fetcher.getURI() + ")"); + } + aborted = true; // retry queryUnit + ctx.getFetchLatch().countDown(); + } } } } @@ -674,7 +680,7 @@ public class Task { storeDir.mkdirs(); } storeFile = new File(storeDir, "in_" + i); - Fetcher fetcher = new Fetcher(uri, storeFile, channelFactory); + Fetcher fetcher = new Fetcher(systemConf, uri, storeFile, channelFactory); runnerList.add(fetcher); i++; } http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java index c933294..82d662b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java @@ -18,70 +18,87 @@ package org.apache.tajo.worker; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.fs.*; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.worker.dataserver.HttpDataServer; -import org.apache.tajo.worker.dataserver.retriever.DataRetriever; -import org.apache.tajo.worker.dataserver.retriever.DirectoryRetriever; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; +import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.File; -import java.io.FileWriter; import java.io.IOException; -import java.net.InetSocketAddress; import java.net.URI; import java.util.Random; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; public class TestFetcher { private String TEST_DATA = "target/test-data/TestFetcher"; private String INPUT_DIR = TEST_DATA+"/in/"; private String OUTPUT_DIR = TEST_DATA+"/out/"; + private TajoConf conf = new TajoConf(); + private TajoPullServerService pullServerService; + private ClientSocketChannelFactory channelFactory; @Before public void setUp() throws Exception { CommonTestingUtil.getTestDir(TEST_DATA); CommonTestingUtil.getTestDir(INPUT_DIR); CommonTestingUtil.getTestDir(OUTPUT_DIR); + conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, INPUT_DIR); + conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_READ_TIMEOUT, 1); + conf.setIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE, 127); + + pullServerService = new TajoPullServerService(); + pullServerService.init(conf); + pullServerService.start(); + + channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1); + } + + @After + public void tearDown(){ + pullServerService.stop(); + channelFactory.releaseExternalResources(); } @Test public void testGet() throws IOException { Random rnd = new Random(); - FileWriter writer = new FileWriter(INPUT_DIR + "data"); - String data; + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); + + Path inputPath = new Path(dataPath); + FSDataOutputStream stream = LocalFileSystem.get(conf).create(inputPath, true); for (int i = 0; i < 100; i++) { - data = ""+rnd.nextInt(); - writer.write(data); + String data = ""+rnd.nextInt(); + stream.write(data.getBytes()); } - writer.flush(); - writer.close(); - - DataRetriever ret = new DirectoryRetriever(INPUT_DIR); - HttpDataServer server = new HttpDataServer( - NetUtils.createSocketAddr("127.0.0.1:0"), ret); - server.start(); - InetSocketAddress addr = server.getBindAddress(); - - URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data"); - ClientSocketChannelFactory channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1); - Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), channelFactory); - fetcher.get(); - server.stop(); - + stream.flush(); + stream.close(); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory); + assertNotNull(fetcher.get()); + FileSystem fs = FileSystem.getLocal(new TajoConf()); - FileStatus inStatus = fs.getFileStatus(new Path(INPUT_DIR, "data")); + FileStatus inStatus = fs.getFileStatus(inputPath); FileStatus outStatus = fs.getFileStatus(new Path(OUTPUT_DIR, "data")); + assertEquals(inStatus.getLen(), outStatus.getLen()); + assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState()); } @Test @@ -96,29 +113,86 @@ public class TestFetcher { @Test public void testStatus() throws Exception { Random rnd = new Random(); - FileWriter writer = new FileWriter(INPUT_DIR + "data"); - String data; + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); + + FSDataOutputStream stream = LocalFileSystem.get(conf).create(new Path(dataPath), true); for (int i = 0; i < 100; i++) { - data = ""+rnd.nextInt(); - writer.write(data); + String data = ""+rnd.nextInt(); + stream.write(data.getBytes()); } - writer.flush(); - writer.close(); + stream.flush(); + stream.close(); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory); + assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); - DataRetriever ret = new DirectoryRetriever(INPUT_DIR); - final HttpDataServer server = new HttpDataServer( - NetUtils.createSocketAddr("127.0.0.1:0"), ret); - server.start(); - InetSocketAddress addr = server.getBindAddress(); + fetcher.get(); + assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState()); + } + + @Test + public void testNoContentFetch() throws Exception { - URI uri = URI.create("http://127.0.0.1:"+addr.getPort() + "/data"); - ClientSocketChannelFactory channelFactory = RpcChannelFactory.createClientChannelFactory("Fetcher", 1); + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; - final Fetcher fetcher = new Fetcher(uri, new File(OUTPUT_DIR + "data"), channelFactory); + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); + + Path inputPath = new Path(dataPath); + if(LocalFileSystem.get(conf).exists(inputPath)){ + LocalFileSystem.get(conf).delete(new Path(dataPath), true); + } + + FSDataOutputStream stream = LocalFileSystem.get(conf).create(new Path(dataPath).getParent(), true); + stream.close(); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory); assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); fetcher.get(); assertEquals(TajoProtos.FetcherState.FETCH_FINISHED, fetcher.getState()); - server.stop(); + } + + @Test + public void testFailureStatus() throws Exception { + Random rnd = new Random(); + + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + + //TajoPullServerService will be throws BAD_REQUEST by Unknown shuffle type + String shuffleType = "x"; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, shuffleType, ta); + + FSDataOutputStream stream = LocalFileSystem.get(conf).create(new Path(dataPath), true); + + for (int i = 0; i < 100; i++) { + String data = params + rnd.nextInt(); + stream.write(data.getBytes()); + } + stream.flush(); + stream.close(); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory); + assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); + + fetcher.get(); + assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/01857dad/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index cc3cb2e..373642b 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -299,9 +299,11 @@ public class TajoPullServerService extends AbstractService { if (sslFactory != null) { pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine())); } - pipeline.addLast("decoder", new HttpRequestDecoder()); + + int maxChunkSize = getConfig().getInt(ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.varname, + ConfVars.SHUFFLE_FETCHER_CHUNK_MAX_SIZE.defaultIntVal); + pipeline.addLast("codec", new HttpServerCodec(4096, 8192, maxChunkSize)); pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16)); - pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("chunking", new ChunkedWriteHandler()); pipeline.addLast("shuffle", PullServer); return pipeline; @@ -319,11 +321,14 @@ public class TajoPullServerService extends AbstractService { new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); private int port; - public PullServer(Configuration conf) { + public PullServer(Configuration conf) throws IOException { this.conf = conf; // indexCache = new IndexCache(new JobConf(conf)); this.port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal); + + // init local temporal dir + lDirAlloc.getAllLocalPathsToRead(".", conf); } public void setPort(int port) { @@ -402,9 +407,13 @@ public class TajoPullServerService extends AbstractService { // if a subquery requires a range shuffle if (shuffleType.equals("r")) { String ta = taskIds.get(0); + if(!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)){ + LOG.warn(e); + sendError(ctx, NO_CONTENT); + return; + } Path path = localFS.makeQualified( lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)); - String startKey = params.get("start").get(0); String endKey = params.get("end").get(0); boolean last = params.get("final") != null; @@ -424,15 +433,20 @@ public class TajoPullServerService extends AbstractService { // if a subquery requires a hash shuffle } else if (shuffleType.equals("h")) { for (String ta : taskIds) { + if (!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/" + partId, conf)) { + LOG.warn(e); + sendError(ctx, NO_CONTENT); + return; + } Path path = localFS.makeQualified( - lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + - ta + "/output/" + partId, conf)); + lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/" + partId, conf)); File file = new File(path.toUri()); FileChunk chunk = new FileChunk(file, 0, file.length()); chunks.add(chunk); } } else { LOG.error("Unknown shuffle type: " + shuffleType); + sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST); return; }
