Repository: tajo Updated Branches: refs/heads/master 0f3412a74 -> 242d6ad65
TAJO-949: PullServer does not release files, when a channel throws an internal exception. (jinho) Closes #76 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/242d6ad6 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/242d6ad6 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/242d6ad6 Branch: refs/heads/master Commit: 242d6ad656685b82f2c552aa09abe862107ae366 Parents: 0f3412a Author: jinossy <[email protected]> Authored: Tue Aug 5 17:14:47 2014 +0900 Committer: jinossy <[email protected]> Committed: Tue Aug 5 17:14:47 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../java/org/apache/tajo/conf/TajoConf.java | 6 +-- .../apache/tajo/engine/function/math/Round.java | 2 +- .../querymaster/QueryMasterManagerService.java | 5 +- .../java/org/apache/tajo/worker/Fetcher.java | 31 +++++++---- .../main/java/org/apache/tajo/worker/Task.java | 2 +- .../org/apache/tajo/worker/TestFetcher.java | 29 ++++++++++- .../tajo/pullserver/TajoPullServerService.java | 54 ++++++++++---------- 8 files changed, 87 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index be71bf4..e1d6d03 100644 --- a/CHANGES +++ b/CHANGES @@ -105,6 +105,9 @@ Release 0.9.0 - unreleased (Hyoungjun Kim via hyunsik) BUG FIXES + + TAJO-949: PullServer does not release files, when a channel throws + an internal exception. (jinho) TAJO-975: alias name which is the same to existing column name may cause NPE during PPD. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index d5e8bc4..b75530b 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -217,9 +217,9 @@ public class TajoConf extends Configuration { SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false), SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", "RAW"), SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", 2), - SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192 * 8), - SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 5), - SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 5), + SHUFFLE_FETCHER_CHUNK_MAX_SIZE("tajo.shuffle.fetcher.chunk.max-size", 8192), + SHUFFLE_FETCHER_READ_TIMEOUT("tajo.shuffle.fetcher.read.timeout-sec", 120), + SHUFFLE_FETCHER_READ_RETRY_MAX_NUM("tajo.shuffle.fetcher.read.retry.max-num", 20), ////////////////////////////////// // Storage Configuration http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java index e457791..cdcb70a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/math/Round.java @@ -42,7 +42,7 @@ import org.apache.tajo.storage.Tuple; paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT4}), @ParamTypes(paramTypes = {TajoDataTypes.Type.FLOAT8}), @ParamTypes(paramTypes = {TajoDataTypes.Type.INT4}), - @ParamTypes(paramTypes = {TajoDataTypes.Type.INT8}), + @ParamTypes(paramTypes = {TajoDataTypes.Type.INT8}) } ) public class Round extends GeneralFunction { http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java index 826052d..f52d143 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java @@ -204,7 +204,10 @@ public class QueryMasterManagerService extends CompositeService try { QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask( new QueryId(report.getId().getQueryUnitId().getExecutionBlockId().getQueryId())); - queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report)); + // queryMaster terminated by internal error before task has not done + if (queryMasterTask != null) { + queryMasterTask.getEventHandler().handle(new TaskCompletionEvent(report)); + } done.run(TajoWorker.TRUE_PROTO); } catch (Exception e) { LOG.error(e.getMessage(), e); http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index 2aa2875..1b95238 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -116,9 +116,9 @@ public class Fetcher { public File get() throws IOException { this.startTime = System.currentTimeMillis(); this.state = TajoProtos.FetcherState.FETCH_FETCHING; - + ChannelFuture future = null; try { - ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); + future = bootstrap.connect(new InetSocketAddress(host, port)); // Wait until the connection attempt succeeds or fails. Channel channel = future.awaitUninterruptibly().getChannel(); @@ -145,10 +145,13 @@ public class Fetcher { channelFuture.addListener(ChannelFutureListener.CLOSE); - // Close the channel to exit. - future.getChannel().close(); return file; } finally { + if(future != null){ + // Close the channel to exit. + future.getChannel().close(); + } + this.finishTime = System.currentTimeMillis(); LOG.info("Status: " + getState() + ", URI:" + uri); if (timer != null) { @@ -249,7 +252,6 @@ public class Fetcher { } if(fileLen == length){ - IOUtils.cleanup(LOG, fc, raf); finishTime = System.currentTimeMillis(); state = TajoProtos.FetcherState.FETCH_FINISHED; } @@ -265,15 +267,26 @@ public class Fetcher { LOG.error("Fetch failed :", e.getCause()); } - if(ctx.getChannel().isConnected()){ - ctx.getChannel().close().setFailure(e.getCause()); - } - // this fetching will be retry IOUtils.cleanup(LOG, fc, raf); + if(ctx.getChannel().isConnected()){ + ctx.getChannel().close(); + } finishTime = System.currentTimeMillis(); state = TajoProtos.FetcherState.FETCH_FAILED; } + + @Override + public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { + super.channelDisconnected(ctx, e); + + if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){ + //channel is closed, but cannot complete fetcher + finishTime = System.currentTimeMillis(); + state = TajoProtos.FetcherState.FETCH_FAILED; + } + IOUtils.cleanup(LOG, fc, raf); + } } class HttpClientPipelineFactory implements http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 3a4536a..195b35e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -626,7 +626,7 @@ public class Task { if (fetcher.getState() == TajoProtos.FetcherState.FETCH_FINISHED && fetched != null) { break; } - } catch (IOException e) { + } catch (Throwable e) { LOG.error("Fetch failed: " + fetcher.getURI(), e); } retryNum++; http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java index 82d662b..c13842b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java @@ -36,8 +36,7 @@ import java.io.IOException; import java.net.URI; import java.util.Random; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; public class TestFetcher { private String TEST_DATA = "target/test-data/TestFetcher"; @@ -195,4 +194,30 @@ public class TestFetcher { fetcher.get(); assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState()); } + + @Test + public void testServerFailure() throws Exception { + QueryId queryId = QueryIdFactory.NULL_QUERY_ID; + String sid = "1"; + String ta = "1_0"; + String partId = "1"; + + String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; + String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); + + URI uri = URI.create("http://127.0.0.1:" + pullServerService.getPort() + "/?" + params); + final Fetcher fetcher = new Fetcher(conf, uri, new File(OUTPUT_DIR + "data"), channelFactory); + assertEquals(TajoProtos.FetcherState.FETCH_INIT, fetcher.getState()); + + pullServerService.stop(); + + boolean failure = false; + try{ + fetcher.get(); + } catch (Throwable e){ + failure = true; + } + assertTrue(failure); + assertEquals(TajoProtos.FetcherState.FETCH_FAILED, fetcher.getState()); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/242d6ad6/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 12cd1a3..3b0ee1f 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -57,7 +57,6 @@ import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.*; import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.handler.codec.frame.TooLongFrameException; import org.jboss.netty.handler.codec.http.*; import org.jboss.netty.handler.ssl.SslHandler; import org.jboss.netty.handler.stream.ChunkedWriteHandler; @@ -491,27 +490,33 @@ public class TajoPullServerService extends AbstractService { private ChannelFuture sendFile(ChannelHandlerContext ctx, Channel ch, FileChunk file) throws IOException { - RandomAccessFile spill; + RandomAccessFile spill = null; + ChannelFuture writeFuture; try { spill = new RandomAccessFile(file.getFile(), "r"); + if (ch.getPipeline().get(SslHandler.class) == null) { + final FadvisedFileRegionWrapper filePart = new FadvisedFileRegionWrapper(spill, + file.startOffset, file.length(), manageOsCache, readaheadLength, + readaheadPool, file.getFile().getAbsolutePath()); + writeFuture = ch.write(filePart); + writeFuture.addListener(new FileCloseListener(filePart)); + } else { + // HTTPS cannot be done with zero copy. + final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, + file.startOffset, file.length, sslFileBufferSize, + manageOsCache, readaheadLength, readaheadPool, + file.getFile().getAbsolutePath()); + writeFuture = ch.write(chunk); + } } catch (FileNotFoundException e) { LOG.info(file.getFile() + " not found"); return null; - } - ChannelFuture writeFuture; - if (ch.getPipeline().get(SslHandler.class) == null) { - final FadvisedFileRegionWrapper filePart = new FadvisedFileRegionWrapper(spill, - file.startOffset, file.length(), manageOsCache, readaheadLength, - readaheadPool, file.getFile().getAbsolutePath()); - writeFuture = ch.write(filePart); - writeFuture.addListener(new FileCloseListener(filePart)); - } else { - // HTTPS cannot be done with zero copy. - final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, - file.startOffset, file.length, sslFileBufferSize, - manageOsCache, readaheadLength, readaheadPool, - file.getFile().getAbsolutePath()); - writeFuture = ch.write(chunk); + } catch (Throwable e) { + if (spill != null) { + //should close a opening file + spill.close(); + } + return null; } metrics.shuffleConnections.incr(); metrics.shuffleOutputBytes.incr(file.length); // optimistic @@ -537,17 +542,10 @@ public class TajoPullServerService extends AbstractService { @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - Channel ch = e.getChannel(); - Throwable cause = e.getCause(); - if (cause instanceof TooLongFrameException) { - sendError(ctx, BAD_REQUEST); - return; - } - - LOG.error("PullServer error: ", cause); - if (ch.isConnected()) { - LOG.error("PullServer error " + e); - sendError(ctx, INTERNAL_SERVER_ERROR); + LOG.error(e.getCause().getMessage(), e.getCause()); + //if channel.close() is not called, never closed files in this request + if (ctx.getChannel().isConnected()){ + ctx.getChannel().close(); } } }
