Repository: tajo Updated Branches: refs/heads/master cf66a3900 -> 3c833e2a8
http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 0920619..70a3202 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -162,7 +162,7 @@ public class Task { context.setState(TaskAttemptState.TA_PENDING); LOG.info("=================================="); - LOG.info("* Subquery " + request.getId() + " is initialized"); + LOG.info("* Stage " + request.getId() + " is initialized"); LOG.info("* InterQuery: " + interQuery + (interQuery ? ", Use " + this.shuffleType + " shuffle":"") + ", Fragments (num: " + request.getFragments().size() + ")" + @@ -734,24 +734,24 @@ public class Task { final List<String> types = params.get("type"); final List<String> qids = params.get("qid"); final List<String> taskIdList = params.get("ta"); - final List<String> subQueryIds = params.get("sid"); + final List<String> stageIds = params.get("sid"); final List<String> partIds = params.get("p"); final List<String> offsetList = params.get("offset"); final List<String> lengthList = params.get("length"); - if (types == null || subQueryIds == null || qids == null || partIds == null) { - LOG.error("Invalid URI - Required queryId, type, subquery Id, and part id"); + if (types == null || stageIds == null || qids == null || partIds == null) { + LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); return null; } - if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) { - LOG.error("Invalid URI - Required qids, type, taskIds, subquery Id, and part id"); + if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { + LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); return null; } String queryId = qids.get(0); String shuffleType = types.get(0); - String sid = subQueryIds.get(0); + String sid = stageIds.get(0); String partId = partIds.get(0); if (shuffleType.equals("r") && taskIdList == null) { @@ -767,10 +767,10 @@ public class Task { LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList); - // The working directory of Tajo worker for each query, including subquery + // The working directory of Tajo worker for each query, including stage String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; - // If the subquery requires a range shuffle + // If the stage requires a range shuffle if (shuffleType.equals("r")) { String ta = taskIds.get(0); if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { @@ -790,7 +790,7 @@ public class Task { return null; } - // If the subquery requires a hash shuffle or a scattered hash shuffle + // If the stage requires a hash shuffle or a scattered hash shuffle } else if (shuffleType.equals("h") || shuffleType.equals("s")) { int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 1556a44..3092c47 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -51,7 +51,7 @@ import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; /** - * Contains the information about executing subquery. + * Contains the information about executing task attempt. */ public class TaskAttemptContext { private static final Log LOG = LogFactory.getLog(TaskAttemptContext.class); http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/resources/webapps/admin/querydetail.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/querydetail.jsp b/tajo-core/src/main/resources/webapps/admin/querydetail.jsp index 41b0e8f..099301e 100644 --- a/tajo-core/src/main/resources/webapps/admin/querydetail.jsp +++ b/tajo-core/src/main/resources/webapps/admin/querydetail.jsp @@ -25,7 +25,7 @@ <%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.List" %> <%@ page import="org.apache.tajo.util.history.QueryHistory" %> -<%@ page import="org.apache.tajo.util.history.SubQueryHistory" %> +<%@ page import="org.apache.tajo.util.history.StageHistory" %> <%@ page import="org.apache.tajo.util.history.HistoryReader" %> <% @@ -36,8 +36,8 @@ String startTime = request.getParameter("startTime"); QueryHistory queryHistory = reader.getQueryHistory(queryId, Long.parseLong(startTime)); - List<SubQueryHistory> subQueryHistories = - queryHistory != null ? JSPUtil.sortSubQueryHistory(queryHistory.getSubQueryHistories()) : null; + List<StageHistory> stageHistories = + queryHistory != null ? JSPUtil.sortStageHistories(queryHistory.getStageHistories()) : null; SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); %> @@ -61,34 +61,34 @@ if (queryHistory == null) { <div>No Query history data.</div> <% } else { - if (subQueryHistories == null) { + if (stageHistories == null) { %> - <div>No SubQuery history data.</div> + <div>No Stage history data.</div> <% } else { %> <table width="100%" border="1" class="border_table"> <tr><th>ID</th><th>State</th><th>Started</th><th>Finished</th><th>Running time</th><th>Progress</th><th>Succeeded/Total</th><th>Failed/Killed</th></tr> <% - for(SubQueryHistory eachSubQuery: subQueryHistories) { - String detailLink = "querytasks.jsp?queryId=" + queryId + "&ebid=" + eachSubQuery.getExecutionBlockId() + "&startTime=" + startTime; + for(StageHistory eachStage: stageHistories) { + String detailLink = "querytasks.jsp?queryId=" + queryId + "&ebid=" + eachStage.getExecutionBlockId() + "&startTime=" + startTime; %> <tr> - <td><a href='<%=detailLink%>'><%=eachSubQuery.getExecutionBlockId()%></a></td> - <td><%=eachSubQuery.getState()%></td> - <td><%=df.format(eachSubQuery.getStartTime())%></td> - <td><%=eachSubQuery.getFinishTime() == 0 ? "-" : df.format(eachSubQuery.getFinishTime())%></td> - <td><%=JSPUtil.getElapsedTime(eachSubQuery.getStartTime(), eachSubQuery.getFinishTime())%></td> - <td align='center'><%=JSPUtil.percentFormat(eachSubQuery.getProgress())%>%</td> - <td align='center'><%=eachSubQuery.getSucceededObjectCount()%> / <%=eachSubQuery.getTotalScheduledObjectsCount()%></td> - <td align='center'><%=eachSubQuery.getFailedObjectCount()%> / <%=eachSubQuery.getKilledObjectCount()%></td> + <td><a href='<%=detailLink%>'><%=eachStage.getExecutionBlockId()%></a></td> + <td><%=eachStage.getState()%></td> + <td><%=df.format(eachStage.getStartTime())%></td> + <td><%=eachStage.getFinishTime() == 0 ? "-" : df.format(eachStage.getFinishTime())%></td> + <td><%=JSPUtil.getElapsedTime(eachStage.getStartTime(), eachStage.getFinishTime())%></td> + <td align='center'><%=JSPUtil.percentFormat(eachStage.getProgress())%>%</td> + <td align='center'><%=eachStage.getSucceededObjectCount()%> / <%=eachStage.getTotalScheduledObjectsCount()%></td> + <td align='center'><%=eachStage.getFailedObjectCount()%> / <%=eachStage.getKilledObjectCount()%></td> </tr> <% } //end of for %> </table> <% - } //end of else [if (subQueryHistories == null)] + } //end of else [if (stageHistories == null)] %> <p/> <h3>Applied Session Variables</h3> http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp index ed97eff..09d9e2e 100644 --- a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp @@ -28,7 +28,7 @@ <%@ page import="org.apache.tajo.master.TajoMaster" %> <%@ page import="org.apache.tajo.util.history.HistoryReader" %> <%@ page import="org.apache.tajo.util.history.QueryHistory" %> -<%@ page import="org.apache.tajo.util.history.SubQueryHistory" %> +<%@ page import="org.apache.tajo.util.history.StageHistory" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="java.util.*" %> <%@ page import="org.apache.tajo.util.history.TaskHistory" %> @@ -43,14 +43,14 @@ QueryHistory queryHistory = reader.getQueryHistory(queryId); - List<SubQueryHistory> subQueryHistories = - queryHistory != null ? JSPUtil.sortSubQueryHistory(queryHistory.getSubQueryHistories()) : null; + List<StageHistory> stageHistories = + queryHistory != null ? JSPUtil.sortStageHistories(queryHistory.getStageHistories()) : null; - SubQueryHistory subQuery = null; - if (subQueryHistories != null) { - for (SubQueryHistory eachSubQuery: subQueryHistories) { - if (eachSubQuery.getExecutionBlockId().equals(ebId)) { - subQuery = eachSubQuery; + StageHistory stage = null; + if (stageHistories != null) { + for (StageHistory eachStage: stageHistories) { + if (eachStage.getExecutionBlockId().equals(ebId)) { + stage = eachStage; break; } } @@ -92,12 +92,12 @@ long totalWriteBytes = 0; long totalWriteRows = 0; - if (subQuery != null) { - totalInputBytes = subQuery.getTotalInputBytes(); - totalReadBytes = subQuery.getTotalReadBytes(); - totalReadRows = subQuery.getTotalReadRows(); - totalWriteBytes = subQuery.getTotalWriteBytes(); - totalWriteRows = subQuery.getTotalWriteRows(); + if (stage != null) { + totalInputBytes = stage.getTotalInputBytes(); + totalReadBytes = stage.getTotalReadBytes(); + totalReadRows = stage.getTotalReadRows(); + totalWriteBytes = stage.getTotalWriteBytes(); + totalWriteRows = stage.getTotalWriteRows(); } List<TaskHistory> allTasks = reader.getTaskHistory(queryId, ebId); @@ -150,12 +150,12 @@ <h3><a href='querydetail.jsp?queryId=<%=queryId%>&startTime=<%=startTime%>'><%=ebId.toString()%></a></h3> <hr/> <p/> - <pre style="white-space:pre-wrap;"><%=subQuery.getPlan()%></pre> + <pre style="white-space:pre-wrap;"><%=stage.getPlan()%></pre> <p/> <table border="1" width="100%" class="border_table"> - <tr><td align='right' width='180px'>Status:</td><td><%=subQuery.getState()%></td></tr> - <tr><td align='right'>Started:</td><td><%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%></td></tr> - <tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=subQuery.getHostLocalAssigned()%>, Rack Local Tasks: <%=subQuery.getRackLocalAssigned()%>)</td></tr> + <tr><td align='right' width='180px'>Status:</td><td><%=stage.getState()%></td></tr> + <tr><td align='right'>Started:</td><td><%=df.format(stage.getStartTime())%> ~ <%=stage.getFinishTime() == 0 ? "-" : df.format(stage.getFinishTime())%></td></tr> + <tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=stage.getHostLocalAssigned()%>, Rack Local Tasks: <%=stage.getRackLocalAssigned()%>)</td></tr> <tr><td align='right'>Progress:</td><td><%=JSPUtil.percentFormat((float) (totalProgress / numTasks))%>%</td></tr> <tr><td align='right'># Shuffles:</td><td><%=numShuffles%></td></tr> <tr><td align='right'>Input Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalInputBytes, false) + " (" + nf.format(totalInputBytes) + " B)"%></td></tr> http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp index ceb1c56..340eb95 100644 --- a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp @@ -31,7 +31,7 @@ <%@ page import="java.util.Map" %> <%@ page import="org.apache.tajo.SessionVars" %> <%@ page import="org.apache.tajo.util.history.QueryHistory" %> -<%@ page import="org.apache.tajo.util.history.SubQueryHistory" %> +<%@ page import="org.apache.tajo.util.history.StageHistory" %> <%@ page import="org.apache.tajo.util.history.HistoryReader" %> <% @@ -61,8 +61,8 @@ return; } - List<SubQueryHistory> subQueryHistories = - queryHistory != null ? JSPUtil.sortSubQueryHistory(queryHistory.getSubQueryHistories()) : null; + List<StageHistory> stageHistories = + queryHistory != null ? JSPUtil.sortStageHistories(queryHistory.getStageHistories()) : null; SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); %> @@ -86,26 +86,26 @@ if (runningQuery && query == null) { if (errorMessage != null && !errorMessage.isEmpty()) { out.write("<p/>Message:<p/><pre>" + errorMessage + "</pre>"); } -} else if (subQueryHistories == null) { - out.write("<p/>Message:<p/><pre>No SubQueries</pre>"); +} else if (stageHistories == null) { + out.write("<p/>Message:<p/><pre>No Stages</pre>"); } else { %> <h3><%=queryId.toString()%> <a href='queryplan.jsp?queryId=<%=queryId%>'>[Query Plan]</a></h3> <table width="100%" border="1" class="border_table"> <tr><th>ID</th><th>State</th><th>Started</th><th>Finished</th><th>Running time</th><th>Progress</th><th>Tasks</th></tr> <% -for(SubQueryHistory eachSubQuery: subQueryHistories) { - eachSubQuery.getSucceededObjectCount(); - String detailLink = "querytasks.jsp?queryId=" + queryId + "&ebid=" + eachSubQuery.getExecutionBlockId(); +for(StageHistory eachStage: stageHistories) { + eachStage.getSucceededObjectCount(); + String detailLink = "querytasks.jsp?queryId=" + queryId + "&ebid=" + eachStage.getExecutionBlockId(); %> <tr> - <td><a href='<%=detailLink%>'><%=eachSubQuery.getExecutionBlockId()%></a></td> - <td><%=eachSubQuery.getState()%></td> - <td><%=df.format(eachSubQuery.getStartTime())%></td> - <td><%=eachSubQuery.getFinishTime() == 0 ? "-" : df.format(eachSubQuery.getFinishTime())%></td> - <td><%=JSPUtil.getElapsedTime(eachSubQuery.getStartTime(), eachSubQuery.getFinishTime())%></td> - <td align='center'><%=JSPUtil.percentFormat(eachSubQuery.getProgress())%>%</td> - <td align='center'><a href='<%=detailLink%>&status=SUCCEEDED'><%=eachSubQuery.getSucceededObjectCount()%></a>/<a href='<%=detailLink%>&status=ALL'><%=eachSubQuery.getTotalScheduledObjectsCount()%></a></td> + <td><a href='<%=detailLink%>'><%=eachStage.getExecutionBlockId()%></a></td> + <td><%=eachStage.getState()%></td> + <td><%=df.format(eachStage.getStartTime())%></td> + <td><%=eachStage.getFinishTime() == 0 ? "-" : df.format(eachStage.getFinishTime())%></td> + <td><%=JSPUtil.getElapsedTime(eachStage.getStartTime(), eachStage.getFinishTime())%></td> + <td align='center'><%=JSPUtil.percentFormat(eachStage.getProgress())%>%</td> + <td align='center'><a href='<%=detailLink%>&status=SUCCEEDED'><%=eachStage.getSucceededObjectCount()%></a>/<a href='<%=detailLink%>&status=ALL'><%=eachStage.getTotalScheduledObjectsCount()%></a></td> </tr> <% } //end of for http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp index ec860b9..88de97d 100644 --- a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp +++ b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp @@ -25,7 +25,7 @@ <%@ page import="org.apache.tajo.QueryId" %> <%@ page import="org.apache.tajo.util.TajoIdUtils" %> <%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %> -<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %> +<%@ page import="org.apache.tajo.master.querymaster.Stage" %> <%@ page import="org.apache.tajo.engine.planner.global.ExecutionBlock" %> <%@ page import="java.util.*" %> <%@ page import="org.apache.tajo.ExecutionBlockId" %> @@ -46,22 +46,22 @@ Query query = queryMasterTask.getQuery(); - Map<ExecutionBlockId, SubQuery> subQueryMap = new HashMap<ExecutionBlockId, SubQuery>(); + Map<ExecutionBlockId, Stage> stageMap = new HashMap<ExecutionBlockId, Stage>(); - for(SubQuery eachSubQuery: query.getSubQueries()) { - subQueryMap.put(eachSubQuery.getId(), eachSubQuery); + for(Stage eachStage : query.getStages()) { + stageMap.put(eachStage.getId(), eachStage); } - class SubQueryInfo { + class StageInfo { ExecutionBlock executionBlock; - SubQuery subQuery; + Stage stage; ExecutionBlockId parentId; int px; int py; int pos; // 0: mid 1: left 2: right - public SubQueryInfo(ExecutionBlock executionBlock, SubQuery subQuery, ExecutionBlockId parentId, int px, int py, int pos) { + public StageInfo(ExecutionBlock executionBlock, Stage stage, ExecutionBlockId parentId, int px, int py, int pos) { this.executionBlock = executionBlock; - this.subQuery = subQuery; + this.stage = stage; this.parentId = parentId; this.px = px; this.py = py; @@ -102,21 +102,21 @@ String curIdStr = null; int x=35, y=1; int pos; - List<SubQueryInfo> subQueryInfos = new ArrayList<SubQueryInfo>(); + List<StageInfo> stageInfos = new ArrayList<StageInfo>(); - subQueryInfos.add(new SubQueryInfo(masterPlan.getRoot(), null, null, x, y, 0)); + stageInfos.add(new StageInfo(masterPlan.getRoot(), null, null, x, y, 0)); - while (!subQueryInfos.isEmpty()) { - SubQueryInfo eachSubQueryInfo = subQueryInfos.remove(0); - curIdStr = eachSubQueryInfo.executionBlock.getId().toString(); + while (!stageInfos.isEmpty()) { + StageInfo eachStageInfo = stageInfos.remove(0); + curIdStr = eachStageInfo.executionBlock.getId().toString(); - y = eachSubQueryInfo.py + 13; - if (eachSubQueryInfo.pos == 0) { - x = eachSubQueryInfo.px; - } else if (eachSubQueryInfo.pos == 1) { - x = eachSubQueryInfo.px - 20; - } else if (eachSubQueryInfo.pos == 2) { - x = eachSubQueryInfo.px + 20; + y = eachStageInfo.py + 13; + if (eachStageInfo.pos == 0) { + x = eachStageInfo.px; + } else if (eachStageInfo.pos == 1) { + x = eachStageInfo.px - 20; + } else if (eachStageInfo.pos == 2) { + x = eachStageInfo.px + 20; } %> <script type='text/javascript'> @@ -128,17 +128,17 @@ </div> <% - if (eachSubQueryInfo.parentId != null) { + if (eachStageInfo.parentId != null) { String outgoing = ""; String prefix = ""; - for (DataChannel channel : masterPlan.getOutgoingChannels(eachSubQueryInfo.executionBlock.getId())) { + for (DataChannel channel : masterPlan.getOutgoingChannels(eachStageInfo.executionBlock.getId())) { outgoing += prefix + channel.getShuffleType(); prefix = "; "; } %> <script type="text/javascript"> var srcId = "<%=curIdStr%>"; - var destId = "<%=eachSubQueryInfo.parentId.toString()%>"; + var destId = "<%=eachStageInfo.parentId.toString()%>"; var src = window.jsPlumb.addEndpoint(srcId, { anchor:"AutoDefault", paintStyle:{ @@ -187,7 +187,7 @@ <script type='text/javascript'> var e = document.getElementById("<%=curIdStr%>"); - var state = "<%=eachSubQueryInfo.subQuery != null ? eachSubQueryInfo.subQuery.getState().name(): ""%>"; + var state = "<%=eachStageInfo.stage != null ? eachStageInfo.stage.getState().name(): ""%>"; switch (state) { case 'NEW': e.style.borderColor = "black"; @@ -219,7 +219,7 @@ </script> <% - List<ExecutionBlock> children = masterPlan.getChilds(eachSubQueryInfo.executionBlock.getId()); + List<ExecutionBlock> children = masterPlan.getChilds(eachStageInfo.executionBlock.getId()); if (children.size() == 1) { pos = 0; @@ -227,7 +227,7 @@ pos = 1; } for (ExecutionBlock child : children) { - subQueryInfos.add(new SubQueryInfo(child, subQueryMap.get(child.getId()), eachSubQueryInfo.executionBlock.getId(), x, y, pos++)); + stageInfos.add(new StageInfo(child, stageMap.get(child.getId()), eachStageInfo.executionBlock.getId(), x, y, pos++)); } } //end of while %> http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp index 265937c..3aef49d 100644 --- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp @@ -78,14 +78,14 @@ } Query query = queryMasterTask.getQuery(); - SubQuery subQuery = query.getSubQuery(ebid); + Stage stage = query.getStage(ebid); - if(subQuery == null) { + if(stage == null) { out.write("<script type='text/javascript'>alert('no sub-query'); history.back(0); </script>"); return; } - if(subQuery == null) { + if(stage == null) { %> <script type="text/javascript"> alert("No Execution Block for" + ebid); @@ -97,7 +97,7 @@ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Task[] allTasks = subQuery.getTasks(); + Task[] allTasks = stage.getTasks(); long totalInputBytes = 0; long totalReadBytes = 0; @@ -105,8 +105,6 @@ long totalWriteBytes = 0; long totalWriteRows = 0; int numTasks = allTasks.length; -// int numSucceededTasks = 0; -// int localReadTasks = subQuery.; int numShuffles = 0; float totalProgress = 0.0f; @@ -166,12 +164,12 @@ <h3><a href='querydetail.jsp?queryId=<%=paramQueryId%>'><%=ebid.toString()%></a></h3> <hr/> <p/> - <pre style="white-space:pre-wrap;"><%=PlannerUtil.buildExplainString(subQuery.getBlock().getPlan())%></pre> + <pre style="white-space:pre-wrap;"><%=PlannerUtil.buildExplainString(stage.getBlock().getPlan())%></pre> <p/> <table border="1" width="100%" class="border_table"> - <tr><td align='right' width='180px'>Status:</td><td><%=subQuery.getState()%></td></tr> - <tr><td align='right'>Started:</td><td><%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%></td></tr> - <tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=subQuery.getTaskScheduler().getHostLocalAssigned()%>, Rack Local Tasks: <%=subQuery.getTaskScheduler().getRackLocalAssigned()%>)</td></tr> + <tr><td align='right' width='180px'>Status:</td><td><%=stage.getState()%></td></tr> + <tr><td align='right'>Started:</td><td><%=df.format(stage.getStartTime())%> ~ <%=stage.getFinishTime() == 0 ? "-" : df.format(stage.getFinishTime())%></td></tr> + <tr><td align='right'># Tasks:</td><td><%=numTasks%> (Local Tasks: <%=stage.getTaskScheduler().getHostLocalAssigned()%>, Rack Local Tasks: <%=stage.getTaskScheduler().getRackLocalAssigned()%>)</td></tr> <tr><td align='right'>Progress:</td><td><%=JSPUtil.percentFormat((float) (totalProgress / numTasks))%>%</td></tr> <tr><td align='right'># Shuffles:</td><td><%=numShuffles%></td></tr> <tr><td align='right'>Input Bytes:</td><td><%=FileUtil.humanReadableByteCount(totalInputBytes, false) + " (" + nf.format(totalInputBytes) + " B)"%></td></tr> http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/main/resources/webapps/worker/task.jsp ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/resources/webapps/worker/task.jsp b/tajo-core/src/main/resources/webapps/worker/task.jsp index 5abbd8c..81b1e6d 100644 --- a/tajo-core/src/main/resources/webapps/worker/task.jsp +++ b/tajo-core/src/main/resources/webapps/worker/task.jsp @@ -28,7 +28,7 @@ <%@ page import="org.apache.tajo.master.querymaster.Query" %> <%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %> <%@ page import="org.apache.tajo.master.querymaster.Task" %> -<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %> +<%@ page import="org.apache.tajo.master.querymaster.Stage" %> <%@ page import="org.apache.tajo.storage.DataLocation" %> <%@ page import="org.apache.tajo.storage.fragment.FileFragment" %> <%@ page import="org.apache.tajo.storage.fragment.FragmentConvertor" %> @@ -65,14 +65,14 @@ } Query query = queryMasterTask.getQuery(); - SubQuery subQuery = query.getSubQuery(ebid); + Stage stage = query.getStage(ebid); - if(subQuery == null) { + if(stage == null) { out.write("<script type='text/javascript'>alert('no sub-query'); history.back(0); </script>"); return; } - if(subQuery == null) { + if(stage == null) { %> <script type="text/javascript"> alert("No Execution Block for" + ebid); @@ -84,7 +84,7 @@ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); TaskId taskId = new TaskId(ebid, taskSeq); - Task task = subQuery.getTask(taskId); + Task task = stage.getTask(taskId); if(task == null) { %> <script type="text/javascript"> http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 9868297..7dc1089 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -45,8 +45,8 @@ import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.querymaster.Query; import org.apache.tajo.master.querymaster.QueryMasterTask; -import org.apache.tajo.master.querymaster.SubQuery; -import org.apache.tajo.master.querymaster.SubQueryState; +import org.apache.tajo.master.querymaster.Stage; +import org.apache.tajo.master.querymaster.StageState; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; @@ -804,10 +804,10 @@ public class TajoTestingCluster { } } - public void waitForSubQueryState(SubQuery subQuery, SubQueryState expected, int delay) throws Exception { + public void waitForStageState(Stage stage, StageState expected, int delay) throws Exception { int i = 0; - while (subQuery == null || subQuery.getSynchronizedState() != expected) { + while (stage == null || stage.getSynchronizedState() != expected) { try { Thread.sleep(delay); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java b/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java index 836dd2f..8dc95de 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java +++ b/tajo-core/src/test/java/org/apache/tajo/TestQueryIdFactory.java @@ -38,12 +38,12 @@ public class TestQueryIdFactory { } @Test - public void testNewSubQueryId() { + public void testNewStageId() { QueryId qid = LocalTajoTestingUtility.newQueryId(); MasterPlan plan = new MasterPlan(qid, null, null); - ExecutionBlockId subqid1 = plan.newExecutionBlockId(); - ExecutionBlockId subqid2 = plan.newExecutionBlockId(); - assertTrue(subqid1.compareTo(subqid2) < 0); + ExecutionBlockId stageId1 = plan.newExecutionBlockId(); + ExecutionBlockId stageId2 = plan.newExecutionBlockId(); + assertTrue(stageId1.compareTo(stageId2) < 0); } @Test http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java index d187071..113288a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java +++ b/tajo-core/src/test/java/org/apache/tajo/client/TestTajoClient.java @@ -38,7 +38,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto; import org.apache.tajo.ipc.ClientProtos.QueryInfoProto; -import org.apache.tajo.ipc.ClientProtos.SubQueryHistoryProto; +import org.apache.tajo.ipc.ClientProtos.StageHistoryProto; import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.StorageUtil; @@ -785,13 +785,13 @@ public class TestTajoClient { QueryHistoryProto queryHistory = client.getQueryHistory(queryId); assertNotNull(queryHistory); assertEquals(queryId.toString(), queryHistory.getQueryId()); - assertEquals(2, queryHistory.getSubQueryHistoriesCount()); + assertEquals(2, queryHistory.getStageHistoriesCount()); - List<SubQueryHistoryProto> taskHistories = - new ArrayList<SubQueryHistoryProto>(queryHistory.getSubQueryHistoriesList()); - Collections.sort(taskHistories, new Comparator<SubQueryHistoryProto>() { + List<ClientProtos.StageHistoryProto> taskHistories = + new ArrayList<StageHistoryProto>(queryHistory.getStageHistoriesList()); + Collections.sort(taskHistories, new Comparator<StageHistoryProto>() { @Override - public int compare(SubQueryHistoryProto o1, SubQueryHistoryProto o2) { + public int compare(ClientProtos.StageHistoryProto o1, StageHistoryProto o2) { return o1.getExecutionBlockId().compareTo(o2.getExecutionBlockId()); } }); http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index edbe029..bfd1700 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -27,7 +27,7 @@ import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.master.querymaster.Query; import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.master.querymaster.Task; -import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.master.querymaster.Stage; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; @@ -367,7 +367,7 @@ public class TestGroupByQuery extends QueryTestCaseBase { assertResultSet(res, "testDistinctAggregation_case4.result"); res.close(); - // two groupby, two distinct, two aggregation with subquery + // two groupby, two distinct, two aggregation with stage res = executeFile("testDistinctAggregation_case5.sql"); assertResultSet(res, "testDistinctAggregation_case5.result"); res.close(); @@ -731,12 +731,12 @@ public class TestGroupByQuery extends QueryTestCaseBase { Set<Integer> partitionIds = new HashSet<Integer>(); Query query = qmTasks.get(qmTasks.size() - 1).getQuery(); - Collection<SubQuery> subQueries = query.getSubQueries(); - assertNotNull(subQueries); - assertTrue(!subQueries.isEmpty()); - for (SubQuery subQuery: subQueries) { - if (subQuery.getId().toStringNoPrefix().endsWith("_000001")) { - for (Task.IntermediateEntry eachInterm: subQuery.getHashShuffleIntermediateEntries()) { + Collection<Stage> stages = query.getStages(); + assertNotNull(stages); + assertTrue(!stages.isEmpty()); + for (Stage stage : stages) { + if (stage.getId().toStringNoPrefix().endsWith("_000001")) { + for (Task.IntermediateEntry eachInterm: stage.getHashShuffleIntermediateEntries()) { partitionIds.add(eachInterm.getPartId()); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index b1e1bec..3400752 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -287,7 +287,7 @@ public class TestTablePartitions extends QueryTestCaseBase { assertResultSet(res, "case12.result"); res.close(); - // alias partition column in subquery + // alias partition column in stage res = executeFile("case13.sql"); assertResultSet(res, "case13.result"); res.close(); http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java index 3d292a4..d46d110 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestUnionQuery.java @@ -145,7 +145,7 @@ public class TestUnionQuery extends QueryTestCaseBase { @Test public final void testUnion12() throws Exception { // test filter pushdown - // with subquery in union query + // with stage in union query ResultSet res = executeQuery(); assertResultSet(res); cleanupQuery(res); @@ -154,7 +154,7 @@ public class TestUnionQuery extends QueryTestCaseBase { @Test public final void testUnion13() throws Exception { // test filter pushdown - // with subquery in union query + // with stage in union query ResultSet res = executeQuery(); assertResultSet(res); cleanupQuery(res); @@ -163,7 +163,7 @@ public class TestUnionQuery extends QueryTestCaseBase { @Test public final void testUnion14() throws Exception { // test filter pushdown - // with group by subquery in union query + // with group by stage in union query ResultSet res = executeQuery(); assertResultSet(res); cleanupQuery(res); http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java index 37ee402..c1f4178 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java @@ -85,13 +85,13 @@ public class TestKillQuery { assertEquals(TajoProtos.QueryState.QUERY_RUNNING, queryMasterTask.getQuery().getSynchronizedState()); } - SubQuery subQuery = queryMasterTask.getQuery().getSubQueries().iterator().next(); - assertNotNull(subQuery); + Stage stage = queryMasterTask.getQuery().getStages().iterator().next(); + assertNotNull(stage); try{ - cluster.waitForSubQueryState(subQuery, SubQueryState.INITED, 2); + cluster.waitForStageState(stage, StageState.INITED, 2); } finally { - assertEquals(SubQueryState.INITED, subQuery.getSynchronizedState()); + assertEquals(StageState.INITED, stage.getSynchronizedState()); } // fire kill event http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestTaskStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestTaskStatusUpdate.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestTaskStatusUpdate.java index 1c2a1a8..3a54478 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestTaskStatusUpdate.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestTaskStatusUpdate.java @@ -103,7 +103,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { String actualResult = resultSetToString(res); System.out.println(actualResult); - // in/out * subquery(4) + // in/out * stage(4) long[] expectedNumRows = new long[]{2, 2, 5, 5, 7, 2, 2, 2}; long[] expectedNumBytes = new long[]{8, 34, 20, 75, 109, 34, 34, 18}; long[] expectedReadBytes = new long[]{8, 8, 20, 20, 109, 0, 34, 0}; @@ -131,7 +131,7 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { res.close(); } - private void assertStatus(int numSubQueries, + private void assertStatus(int numStages, long[] expectedNumRows, long[] expectedNumBytes, long[] expectedReadBytes) throws Exception { @@ -160,20 +160,20 @@ public class TestTaskStatusUpdate extends QueryTestCaseBase { assertNotNull(query); - List<SubQuery> subQueries = new ArrayList<SubQuery>(query.getSubQueries()); - assertEquals(numSubQueries, subQueries.size()); + List<Stage> stages = new ArrayList<Stage>(query.getStages()); + assertEquals(numStages, stages.size()); - Collections.sort(subQueries, new Comparator<SubQuery>() { + Collections.sort(stages, new Comparator<Stage>() { @Override - public int compare(SubQuery o1, SubQuery o2) { + public int compare(Stage o1, Stage o2) { return o1.getId().compareTo(o2.getId()); } }); int index = 0; - for (SubQuery eachSubQuery: subQueries) { - TableStats inputStats = eachSubQuery.getInputStats(); - TableStats resultStats = eachSubQuery.getResultStats(); + for (Stage eachStage : stages) { + TableStats inputStats = eachStage.getInputStats(); + TableStats resultStats = eachStage.getResultStats(); assertNotNull(inputStats); assertEquals(expectedNumRows[index], inputStats.getNumRows().longValue()); http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java index 8d6a94d..632e9c2 100644 --- a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java +++ b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java @@ -126,12 +126,12 @@ public class TestHistoryWriterReader extends QueryTestCaseBase { QueryId queryId = QueryIdFactory.newQueryId(startTime, 1); queryHistory.setQueryId(queryId.toString()); queryHistory.setLogicalPlan("LogicalPlan"); - List<SubQueryHistory> subQueries = new ArrayList<SubQueryHistory>(); + List<StageHistory> stages = new ArrayList<StageHistory>(); for (int i = 0; i < 3; i++) { ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId(queryId, i); - SubQueryHistory subQueryHistory = new SubQueryHistory(); - subQueryHistory.setExecutionBlockId(ebId.toString()); - subQueryHistory.setStartTime(startTime + i); + StageHistory stageHistory = new StageHistory(); + stageHistory.setExecutionBlockId(ebId.toString()); + stageHistory.setStartTime(startTime + i); List<TaskHistory> taskHistories = new ArrayList<TaskHistory>(); for (int j = 0; j < 5; j++) { @@ -139,10 +139,10 @@ public class TestHistoryWriterReader extends QueryTestCaseBase { taskHistory.setId(QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(ebId), 1).toString()); taskHistories.add(taskHistory); } - subQueryHistory.setTasks(taskHistories); - subQueries.add(subQueryHistory); + stageHistory.setTasks(taskHistories); + stages.add(stageHistory); } - queryHistory.setSubQueryHistories(subQueries); + queryHistory.setStageHistories(stages); writer.appendHistory(queryHistory); @@ -166,16 +166,16 @@ public class TestHistoryWriterReader extends QueryTestCaseBase { QueryHistory foundQueryHistory = reader.getQueryHistory(queryId.toString()); assertNotNull(foundQueryHistory); assertEquals(queryId.toString(), foundQueryHistory.getQueryId()); - assertEquals(3, foundQueryHistory.getSubQueryHistories().size()); + assertEquals(3, foundQueryHistory.getStageHistories().size()); for (int i = 0; i < 3; i++) { String ebId = QueryIdFactory.newExecutionBlockId(queryId, i).toString(); - SubQueryHistory subQueryHistory = foundQueryHistory.getSubQueryHistories().get(i); - assertEquals(ebId, subQueryHistory.getExecutionBlockId()); - assertEquals(startTime + i, subQueryHistory.getStartTime()); + StageHistory stageHistory = foundQueryHistory.getStageHistories().get(i); + assertEquals(ebId, stageHistory.getExecutionBlockId()); + assertEquals(startTime + i, stageHistory.getStartTime()); // TaskHistory is stored in the other file. - assertNull(subQueryHistory.getTasks()); + assertNull(stageHistory.getTasks()); List<TaskHistory> tasks = reader.getTaskHistory(queryId.toString(), ebId); assertNotNull(tasks); @@ -183,7 +183,7 @@ public class TestHistoryWriterReader extends QueryTestCaseBase { for (int j = 0; j < 5; j++) { TaskHistory taskHistory = tasks.get(j); - assertEquals(subQueries.get(i).getTasks().get(j).getId(), taskHistory.getId()); + assertEquals(stages.get(i).getTasks().get(j).getId(), taskHistory.getId()); } } } finally { http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-dist/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index c408b16..69b91f9 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -224,7 +224,7 @@ <arguments> <argument>Tajo</argument> <argument>org.apache.tajo.master.querymaster.Query, - org.apache.tajo.master.querymaster.SubQuery, + org.apache.tajo.master.querymaster.Stage, org.apache.tajo.master.querymaster.Task, org.apache.tajo.master.querymaster.TaskAttempt </argument> http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java index 1c63c8a..d633058 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/PullServerAuxService.java @@ -363,18 +363,18 @@ public class PullServerAuxService extends AuxiliaryService { new QueryStringDecoder(request.getUri()).getParameters(); final List<String> types = params.get("type"); final List<String> taskIdList = params.get("ta"); - final List<String> subQueryIds = params.get("sid"); + final List<String> stageIds = params.get("sid"); final List<String> partitionIds = params.get("p"); - if (types == null || taskIdList == null || subQueryIds == null + if (types == null || taskIdList == null || stageIds == null || partitionIds == null) { - sendError(ctx, "Required type, taskIds, subquery Id, and partition id", + sendError(ctx, "Required type, taskIds, stage Id, and partition id", BAD_REQUEST); return; } - if (types.size() != 1 || subQueryIds.size() != 1) { - sendError(ctx, "Required type, taskIds, subquery Id, and partition id", + if (types.size() != 1 || stageIds.size() != 1) { + sendError(ctx, "Required type, taskIds, stage Id, and partition id", BAD_REQUEST); return; } @@ -382,7 +382,7 @@ public class PullServerAuxService extends AuxiliaryService { final List<FileChunk> chunks = Lists.newArrayList(); String repartitionType = types.get(0); - String sid = subQueryIds.get(0); + String sid = stageIds.get(0); String partitionId = partitionIds.get(0); List<String> taskIds = splitMaps(taskIdList); @@ -399,7 +399,7 @@ public class PullServerAuxService extends AuxiliaryService { } LOG.info("PullServer baseDir: " + taskLocalDir + "/" + queryBaseDir); - // if a subquery requires a range partitioning + // if a stage requires a range partitioning if (repartitionType.equals("r")) { String ta = taskIds.get(0); Path path = localFS.makeQualified( @@ -422,7 +422,7 @@ public class PullServerAuxService extends AuxiliaryService { chunks.add(chunk); } - // if a subquery requires a hash repartition or a scattered hash repartition + // if a stage requires a hash repartition or a scattered hash repartition } else if (repartitionType.equals("h") || repartitionType.equals("s")) { for (String ta : taskIds) { Path path = localFS.makeQualified( http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 5a4e69f..860bc8e 100644 --- a/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -493,19 +493,19 @@ public class TajoPullServerService extends AbstractService { final List<String> types = params.get("type"); final List<String> qids = params.get("qid"); final List<String> taskIdList = params.get("ta"); - final List<String> subQueryIds = params.get("sid"); + final List<String> stageIds = params.get("sid"); final List<String> partIds = params.get("p"); final List<String> offsetList = params.get("offset"); final List<String> lengthList = params.get("length"); - if (types == null || subQueryIds == null || qids == null || partIds == null) { - sendError(ctx, "Required queryId, type, subquery Id, and part id", + if (types == null || stageIds == null || qids == null || partIds == null) { + sendError(ctx, "Required queryId, type, stage Id, and part id", BAD_REQUEST); return; } - if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) { - sendError(ctx, "Required qids, type, taskIds, subquery Id, and part id", + if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { + sendError(ctx, "Required qids, type, taskIds, stage Id, and part id", BAD_REQUEST); return; } @@ -513,7 +513,7 @@ public class TajoPullServerService extends AbstractService { String partId = partIds.get(0); String queryId = qids.get(0); String shuffleType = types.get(0); - String sid = subQueryIds.get(0); + String sid = stageIds.get(0); long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; @@ -536,7 +536,7 @@ public class TajoPullServerService extends AbstractService { final List<FileChunk> chunks = Lists.newArrayList(); - // if a subquery requires a range shuffle + // if a stage requires a range shuffle if (shuffleType.equals("r")) { String ta = taskIds.get(0); if(!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/", conf)){ @@ -562,7 +562,7 @@ public class TajoPullServerService extends AbstractService { chunks.add(chunk); } - // if a subquery requires a hash shuffle or a scattered hash shuffle + // if a stage requires a hash shuffle or a scattered hash shuffle } else if (shuffleType.equals("h") || shuffleType.equals("s")) { int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId; http://git-wip-us.apache.org/repos/asf/tajo/blob/3c833e2a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java index 3d4f7d5..c427940 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileStorageManager.java @@ -49,7 +49,7 @@ public class FileStorageManager extends StorageManager { private final Log LOG = LogFactory.getLog(FileStorageManager.class); static final String OUTPUT_FILE_PREFIX="part-"; - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY = + static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_STAGE = new ThreadLocal<NumberFormat>() { @Override public NumberFormat initialValue() { @@ -274,10 +274,10 @@ public class FileStorageManager extends StorageManager { return workDir; } // The final result of a task will be written in a file named part-ss-nnnnnnn, - // where ss is the subquery id associated with this task, and nnnnnn is the task id. + // where ss is the stage id associated with this task, and nnnnnn is the task id. Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME, OUTPUT_FILE_PREFIX + - OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId()) + "-" + + OUTPUT_FILE_FORMAT_STAGE.get().format(taskAttemptId.getTaskId().getExecutionBlockId().getId()) + "-" + OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getTaskId().getId()) + "-" + OUTPUT_FILE_FORMAT_SEQ.get().format(0)); LOG.info("Output File Path: " + outFilePath);
