Repository: tajo Updated Branches: refs/heads/master de1b7d42d -> 8f00bf743
TAJO-1869: Incorrect result when sorting table with small files. Closes #769 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8f00bf74 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8f00bf74 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8f00bf74 Branch: refs/heads/master Commit: 8f00bf7433e7dd99d843e9c97894bdabd81ccb50 Parents: de1b7d4 Author: Jinho Kim <[email protected]> Authored: Mon Sep 21 10:34:06 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Mon Sep 21 10:34:06 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../apache/tajo/engine/query/TestSortQuery.java | 19 ++- .../dataset/TestSortQuery/table3/table1.tbl | 1 + .../dataset/TestSortQuery/table3/table2.tbl | 1 + .../dataset/TestSortQuery/table3/table3.tbl | 1 + .../dataset/TestSortQuery/table3/table4.tbl | 1 + .../dataset/TestSortQuery/table3/table5.tbl | 1 + .../create_table_with_unique_small_dataset.sql | 1 + .../queries/TestSortQuery/testOutOfScope.sql | 1 + .../results/TestSortQuery/testOutOfScope.result | 7 ++ .../planner/physical/ExternalSortExec.java | 4 +- .../tajo/worker/ExecutionBlockContext.java | 17 +-- .../java/org/apache/tajo/worker/Fetcher.java | 2 - .../apache/tajo/worker/TaskAttemptContext.java | 5 +- .../java/org/apache/tajo/worker/TaskImpl.java | 111 +++++++---------- .../tajo/pullserver/TajoPullServerService.java | 122 +++++++++++-------- .../org/apache/tajo/rpc/RpcClientManager.java | 1 - 17 files changed, 159 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index eacfbe1..a2496c1 100644 --- a/CHANGES +++ b/CHANGES @@ -273,6 +273,8 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1869: Incorrect result when sorting table with small files. (jinho) + TAJO-1846: Python temp directory path should be selected differently based on user platform. (Contributed by Dongkyu Hwangbo, Committed by jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java index 9e632ab..6d6a44c 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSortQuery.java @@ -18,10 +18,7 @@ package org.apache.tajo.engine.query; -import org.apache.tajo.IntegrationTest; -import org.apache.tajo.QueryTestCaseBase; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.*; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf.ConfVars; @@ -374,4 +371,18 @@ public class TestSortQuery extends QueryTestCaseBase { public final void testSubQuerySortAfterGroupMultiBlocks() throws Exception { runSimpleTests(); } + + @Test + public final void testOutOfScope() throws Exception { + executeDDL("create_table_with_unique_small_dataset.sql", "table3"); + // table has 5 files + testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "5"); + try { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } finally { + testingCluster.setAllTajoDaemonConfValue(ConfVars.$TEST_MIN_TASK_NUM.varname, "0"); + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl new file mode 100644 index 0000000..d9374de --- /dev/null +++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table1.tbl @@ -0,0 +1 @@ +A,1 http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl new file mode 100644 index 0000000..8c6802f --- /dev/null +++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table2.tbl @@ -0,0 +1 @@ +C,3 http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl new file mode 100644 index 0000000..8184c74 --- /dev/null +++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table3.tbl @@ -0,0 +1 @@ +B,2 http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl new file mode 100644 index 0000000..4a5aa9c --- /dev/null +++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table4.tbl @@ -0,0 +1 @@ +D,4 http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl new file mode 100644 index 0000000..bfb50ee --- /dev/null +++ b/tajo-core-tests/src/test/resources/dataset/TestSortQuery/table3/table5.tbl @@ -0,0 +1 @@ +E,5 http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql new file mode 100644 index 0000000..bf1fc0f --- /dev/null +++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/create_table_with_unique_small_dataset.sql @@ -0,0 +1 @@ +create external table testOutOfScope (col1 text, col2 int4) using text with ('text.delimiter'=',') location ${table.path}; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql new file mode 100644 index 0000000..3ea7cab --- /dev/null +++ b/tajo-core-tests/src/test/resources/queries/TestSortQuery/testOutOfScope.sql @@ -0,0 +1 @@ +select col1, col2 from testOutOfScope order by col1 desc, col2 desc; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result b/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result new file mode 100644 index 0000000..a07de56 --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestSortQuery/testOutOfScope.result @@ -0,0 +1,7 @@ +col1,col2 +------------------------------- +E,5 +D,4 +C,3 +B,2 +A,1 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 4a9b491..42d99bb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -18,7 +18,6 @@ package org.apache.tajo.engine.planner.physical; -import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.LocalDirAllocator; @@ -410,7 +409,8 @@ public class ExternalSortExec extends SortExec { if (frag.getTableName().contains(INTERMEDIATE_FILE_PREFIX)) { localFS.delete(frag.getPath(), true); numDeletedFiles++; - LOG.info("Delete merged intermediate file: " + frag); + + if(LOG.isDebugEnabled()) LOG.debug("Delete merged intermediate file: " + frag); } } info(LOG, numDeletedFiles + " merged intermediate files deleted"); http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 6f92344..94bf785 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -35,10 +35,10 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.rpc.*; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.HashShuffleAppenderManager; -import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.Pair; import org.apache.tajo.worker.event.ExecutionBlockErrorEvent; @@ -213,21 +213,12 @@ public class ExecutionBlockContext { } public static Path getBaseOutputDir(ExecutionBlockId executionBlockId) { - Path workDir = - StorageUtil.concatPath( - executionBlockId.getQueryId().toString(), - "output", - String.valueOf(executionBlockId.getId())); - return workDir; + return TajoPullServerService.getBaseOutputDir( + executionBlockId.getQueryId().toString(), String.valueOf(executionBlockId.getId())); } public static Path getBaseInputDir(ExecutionBlockId executionBlockId) { - Path workDir = - StorageUtil.concatPath( - executionBlockId.getQueryId().toString(), - "in", - executionBlockId.toString()); - return workDir; + return TajoPullServerService.getBaseInputDir(executionBlockId.getQueryId().toString(), executionBlockId.toString()); } public ExecutionBlockId getExecutionBlockId() { http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index 762278b..71d30cd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -125,14 +125,12 @@ public class Fetcher { public FileChunk get() throws IOException { if (useLocalFile) { - LOG.info("Get pseudo fetch from local host"); startTime = System.currentTimeMillis(); finishTime = System.currentTimeMillis(); state = TajoProtos.FetcherState.FETCH_FINISHED; return fileChunk; } - LOG.info("Get real fetch from remote host"); this.startTime = System.currentTimeMillis(); this.state = TajoProtos.FetcherState.FETCH_FETCHING; ChannelFuture future = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index a2f8c06..228c32a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -139,7 +139,10 @@ public class TaskAttemptContext { public void setState(TaskAttemptState state) { this.state = state; - LOG.info("Query status of " + getTaskId() + " is changed to " + state); + + if (LOG.isDebugEnabled()) { + LOG.debug("Query status of " + getTaskId() + " is changed to " + state); + } } public void setDataChannel(DataChannel dataChannel) { http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 7b8d06f..4e3a8bf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -21,7 +21,6 @@ package org.apache.tajo.worker; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import io.netty.handler.codec.http.QueryStringDecoder; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,12 +44,15 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequest; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; -import org.apache.tajo.plan.serder.PlanProto.ShuffleType; -import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; -import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType; import org.apache.tajo.plan.function.python.TajoScriptEngine; -import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.logical.SortNode; import org.apache.tajo.plan.serder.LogicalNodeDeserializer; +import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; +import org.apache.tajo.plan.serder.PlanProto.EnforceProperty.EnforceType; +import org.apache.tajo.plan.serder.PlanProto.ShuffleType; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.pullserver.retriever.FileChunk; @@ -569,7 +571,6 @@ public class TaskImpl implements Task { if (name.equals(chunk.getEbId())) { tablet = new FileFragment(name, new Path(chunk.getFile().getPath()), chunk.startOffset(), chunk.length()); listTablets.add(tablet); - LOG.info("One local chunk is added to listTablets"); } } } @@ -670,6 +671,7 @@ public class TaskImpl implements Task { getLocalPathToRead(getTaskAttemptDir(ctx.getTaskId()).toString(), systemConf); int i = 0; + int localStoreChunkCount = 0; File storeDir; File defaultStoreFile; FileChunk storeChunk = null; @@ -687,23 +689,18 @@ public class TaskImpl implements Task { WorkerConnectionInfo conn = executionBlockContext.getWorkerContext().getConnectionInfo(); if (NetUtils.isLocalAddress(address) && conn.getPullServerPort() == uri.getPort()) { - boolean hasError = false; - try { - LOG.info("Try to get local file chunk at local host"); - storeChunk = getLocalStoredFileChunk(uri, systemConf); - } catch (Throwable t) { - hasError = true; - } + + storeChunk = getLocalStoredFileChunk(uri, systemConf); // When a range request is out of range, storeChunk will be NULL. This case is normal state. // So, we should skip and don't need to create storeChunk. - if (storeChunk == null && !hasError) { + if (storeChunk == null || storeChunk.length() == 0) { continue; } - if (storeChunk != null && storeChunk.getFile() != null && storeChunk.startOffset() > -1 - && hasError == false) { + if (storeChunk.getFile() != null && storeChunk.startOffset() > -1) { storeChunk.setFromRemote(false); + localStoreChunkCount++; } else { storeChunk = new FileChunk(defaultStoreFile, 0, -1); storeChunk.setFromRemote(true); @@ -717,12 +714,16 @@ public class TaskImpl implements Task { // represents a complete file. Otherwise, storeChunk may represent a complete file or only a part of it storeChunk.setEbId(f.getName()); Fetcher fetcher = new Fetcher(systemConf, uri, storeChunk); - LOG.info("Create a new Fetcher with storeChunk:" + storeChunk.toString()); runnerList.add(fetcher); i++; + if (LOG.isDebugEnabled()) { + LOG.debug("Create a new Fetcher with storeChunk:" + storeChunk.toString()); + } } } ctx.addFetchPhase(runnerList.size(), new File(inputDir.toString())); + LOG.info("Create shuffle Fetchers local:" + localStoreChunkCount + + ", remote:" + (runnerList.size() - localStoreChunkCount)); return runnerList; } else { return Lists.newArrayList(); @@ -731,56 +732,42 @@ public class TaskImpl implements Task { private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IOException { // Parse the URI - LOG.info("getLocalStoredFileChunk starts"); - final Map<String, List<String>> params = new QueryStringDecoder(fetchURI.toString()).parameters(); - final List<String> types = params.get("type"); - final List<String> qids = params.get("qid"); - final List<String> taskIdList = params.get("ta"); - final List<String> stageIds = params.get("sid"); - final List<String> partIds = params.get("p"); - final List<String> offsetList = params.get("offset"); - final List<String> lengthList = params.get("length"); - - if (types == null || stageIds == null || qids == null || partIds == null) { - LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); - return null; - } - if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { - LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); - return null; - } + // Parsing the URL into key-values + final Map<String, List<String>> params = TajoPullServerService.decodeParams(fetchURI.toString()); - String queryId = qids.get(0); - String shuffleType = types.get(0); - String sid = stageIds.get(0); - String partId = partIds.get(0); + String partId = params.get("p").get(0); + String queryId = params.get("qid").get(0); + String shuffleType = params.get("type").get(0); + String sid = params.get("sid").get(0); - if (shuffleType.equals("r") && taskIdList == null) { - LOG.error("Invalid URI - For range shuffle, taskId is required"); - return null; - } - List<String> taskIds = splitMaps(taskIdList); + final List<String> taskIdList = params.get("ta"); + final List<String> offsetList = params.get("offset"); + final List<String> lengthList = params.get("length"); - FileChunk chunk; long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; - LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId - + ", taskIds=" + taskIdList); + if (LOG.isDebugEnabled()) { + LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId + + ", taskIds=" + taskIdList); + } // The working directory of Tajo worker for each query, including stage - String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; + Path queryBaseDir = TajoPullServerService.getBaseOutputDir(queryId, sid); + List<String> taskIds = TajoPullServerService.splitMaps(taskIdList); + FileChunk chunk; // If the stage requires a range shuffle if (shuffleType.equals("r")) { - String ta = taskIds.get(0); - if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { - LOG.warn("Range shuffle - file not exist"); + + Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output"); + if (!executionBlockContext.getLocalDirAllocator().ifExists(outputPath.toString(), conf)) { + LOG.warn("Range shuffle - file not exist. " + outputPath); return null; } Path path = executionBlockContext.getLocalFS().makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathToRead(queryBaseDir + ta + "/output/", conf)); + executionBlockContext.getLocalDirAllocator().getLocalPathToRead(outputPath.toString(), conf)); String startKey = params.get("start").get(0); String endKey = params.get("end").get(0); boolean last = params.get("final") != null; @@ -794,14 +781,15 @@ public class TaskImpl implements Task { // If the stage requires a hash shuffle or a scattered hash shuffle } else if (shuffleType.equals("h") || shuffleType.equals("s")) { - int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); - String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; - if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath, conf)) { + int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf); + Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId); + + if (!executionBlockContext.getLocalDirAllocator().ifExists(partPath.toString(), conf)) { LOG.warn("Hash shuffle or Scattered hash shuffle - file not exist: " + partPath); return null; } Path path = executionBlockContext.getLocalFS().makeQualified( - executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath, conf)); + executionBlockContext.getLocalDirAllocator().getLocalPathToRead(partPath.toString(), conf)); File file = new File(path.toUri()); long startPos = (offset >= 0 && length >= 0) ? offset : 0; long readLen = (offset >= 0 && length >= 0) ? length : file.length(); @@ -820,17 +808,6 @@ public class TaskImpl implements Task { return chunk; } - private List<String> splitMaps(List<String> mapq) { - if (null == mapq) { - return null; - } - final List<String> ret = new ArrayList<String>(); - for (String s : mapq) { - Collections.addAll(ret, s.split(",")); - } - return ret; - } - public static Path getTaskAttemptDir(TaskAttemptId quid) { Path workDir = StorageUtil.concatPath(ExecutionBlockContext.getBaseInputDir(quid.getTaskId().getExecutionBlockId()), http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 59a758f..d6f5a90 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -59,11 +59,8 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.NettyUtils; -import org.apache.tajo.storage.HashShuffleAppenderManager; -import org.apache.tajo.storage.RowStoreUtil; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.index.bst.BSTIndex; import java.io.*; @@ -432,17 +429,6 @@ public class TajoPullServerService extends AbstractService { lDirAlloc.getAllLocalPathsToRead(".", conf); } - private List<String> splitMaps(List<String> mapq) { - if (null == mapq) { - return null; - } - final List<String> ret = new ArrayList<String>(); - for (String s : mapq) { - Collections.addAll(ret, s.split(",")); - } - return ret; - } - @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { accepted.add(ctx.channel()); @@ -461,37 +447,30 @@ public class TajoPullServerService extends AbstractService { ProcessingStatus processingStatus = new ProcessingStatus(request.getUri().toString()); processingStatusMap.put(request.getUri().toString(), processingStatus); - // Parsing the URL into key-values - final Map<String, List<String>> params = new QueryStringDecoder(request.getUri()).parameters(); - final List<String> types = params.get("type"); - final List<String> qids = params.get("qid"); - final List<String> taskIdList = params.get("ta"); - final List<String> subQueryIds = params.get("sid"); - final List<String> partIds = params.get("p"); - final List<String> offsetList = params.get("offset"); - final List<String> lengthList = params.get("length"); - if (types == null || subQueryIds == null || qids == null || partIds == null) { - sendError(ctx, "Required queryId, type, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST); - return; + // Parsing the URL into key-values + Map<String, List<String>> params = null; + try { + params = decodeParams(request.getUri()); + } catch (Throwable e) { + sendError(ctx, e.getMessage(), HttpResponseStatus.BAD_REQUEST); } - if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) { - sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id", HttpResponseStatus.BAD_REQUEST); - return; - } + String partId = params.get("p").get(0); + String queryId = params.get("qid").get(0); + String shuffleType = params.get("type").get(0); + String sid = params.get("sid").get(0); - String partId = partIds.get(0); - String queryId = qids.get(0); - String shuffleType = types.get(0); - String sid = subQueryIds.get(0); + final List<String> taskIdList = params.get("ta"); + final List<String> offsetList = params.get("offset"); + final List<String> lengthList = params.get("length"); long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; List<String> taskIds = splitMaps(taskIdList); - String queryBaseDir = queryId.toString() + "/output"; + Path queryBaseDir = getBaseOutputDir(queryId, sid); if (LOG.isDebugEnabled()) { LOG.debug("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId @@ -505,15 +484,13 @@ public class TajoPullServerService extends AbstractService { // if a stage requires a range shuffle if (shuffleType.equals("r")) { - String ta = taskIds.get(0); - String pathString = queryBaseDir + "/" + sid + "/" + ta + "/output/"; - if (!lDirAlloc.ifExists(pathString, conf)) { - LOG.warn(pathString + "does not exist."); + Path outputPath = StorageUtil.concatPath(queryBaseDir, taskIds.get(0), "output"); + if (!lDirAlloc.ifExists(outputPath.toString(), conf)) { + LOG.warn(outputPath + "does not exist."); sendError(ctx, HttpResponseStatus.NO_CONTENT); return; } - Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta - + "/output/", conf)); + Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(outputPath.toString(), conf)); String startKey = params.get("start").get(0); String endKey = params.get("end").get(0); boolean last = params.get("final") != null; @@ -533,14 +510,14 @@ public class TajoPullServerService extends AbstractService { // if a stage requires a hash shuffle or a scattered hash shuffle } else if (shuffleType.equals("h") || shuffleType.equals("s")) { int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf); - String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId; - if (!lDirAlloc.ifExists(partPath, conf)) { + Path partPath = StorageUtil.concatPath(queryBaseDir, "hash-shuffle", String.valueOf(partParentId), partId); + if (!lDirAlloc.ifExists(partPath.toString(), conf)) { LOG.warn("Partition shuffle file not exists: " + partPath); sendError(ctx, HttpResponseStatus.NO_CONTENT); return; } - Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath, conf)); + Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath.toString(), conf)); File file = new File(path.toUri()); long startPos = (offset >= 0 && length >= 0) ? offset : 0; @@ -683,8 +660,9 @@ public class TajoPullServerService extends AbstractService { Schema keySchema = idxReader.getKeySchema(); TupleComparator comparator = idxReader.getComparator(); - LOG.info("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " - + idxReader.getLastKey()); + if (LOG.isDebugEnabled()) { + LOG.debug("BSTIndex is loaded from disk (" + idxReader.getFirstKey() + ", " + idxReader.getLastKey() + ")"); + } File data = new File(URI.create(outDir.toUri() + "/output")); byte [] startBytes = Base64.decodeBase64(startKey); @@ -776,7 +754,55 @@ public class TajoPullServerService extends AbstractService { idxReader.close(); FileChunk chunk = new FileChunk(data, startOffset, endOffset - startOffset); - LOG.info("Retrieve File Chunk: " + chunk); + + if(LOG.isDebugEnabled()) LOG.debug("Retrieve File Chunk: " + chunk); return chunk; } + + public static List<String> splitMaps(List<String> mapq) { + if (null == mapq) { + return null; + } + final List<String> ret = new ArrayList<String>(); + for (String s : mapq) { + Collections.addAll(ret, s.split(",")); + } + return ret; + } + + public static Map<String, List<String>> decodeParams(String uri) { + final Map<String, List<String>> params = new QueryStringDecoder(uri).parameters(); + final List<String> types = params.get("type"); + final List<String> qids = params.get("qid"); + final List<String> ebIds = params.get("sid"); + final List<String> partIds = params.get("p"); + + if (types == null || ebIds == null || qids == null || partIds == null) { + throw new IllegalArgumentException("invalid params. required :" + params); + } + + if (qids.size() != 1 && types.size() != 1 || ebIds.size() != 1) { + throw new IllegalArgumentException("invalid params. required :" + params); + } + + return params; + } + + public static Path getBaseOutputDir(String queryId, String executionBlockSequenceId) { + Path workDir = + StorageUtil.concatPath( + queryId, + "output", + executionBlockSequenceId); + return workDir; + } + + public static Path getBaseInputDir(String queryId, String executionBlockId) { + Path workDir = + StorageUtil.concatPath( + queryId, + "in", + executionBlockId); + return workDir; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8f00bf74/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java ---------------------------------------------------------------------- diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java index 598c8e8..c801b8a 100644 --- a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/RpcClientManager.java @@ -199,7 +199,6 @@ public class RpcClientManager { * After it is shutdown it is not possible to reuse it again. */ public static void shutdown() { - close(); NettyUtils.shutdownGracefully(); }
