Repository: hive Updated Branches: refs/heads/master 9179178e4 -> 9e1fa0ce6
HIVE-13421 : Propagate job progress in operation status Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e1fa0ce Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e1fa0ce Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e1fa0ce Branch: refs/heads/master Commit: 9e1fa0ce6f2003300640f0bee9b267e33d714084 Parents: 9179178 Author: Rajat Khandelwal <pro...@apache.org> Authored: Sat Apr 30 08:16:51 2016 +0530 Committer: Amareshwari Sriramadasu <amareshw...@apache.org> Committed: Sat Apr 30 08:16:51 2016 +0530 ---------------------------------------------------------------------- .../service/cli/session/TestQueryDisplay.java | 4 ++-- .../java/org/apache/hadoop/hive/ql/Driver.java | 13 +++++++++++-- .../org/apache/hadoop/hive/ql/QueryDisplay.java | 19 +++++++++++++++++-- .../org/apache/hadoop/hive/ql/QueryPlan.java | 16 +--------------- .../hadoop/hive/ql/exec/ConditionalTask.java | 1 + .../org/apache/hadoop/hive/ql/exec/Task.java | 14 ++++++++++++-- .../hadoop/hive/ql/exec/mr/ExecDriver.java | 2 +- .../hive/ql/exec/mr/HadoopJobExecHelper.java | 6 ++++-- .../apache/hive/service/cli/CLIServiceTest.java | 4 +++- 9 files changed, 52 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java index 98581e0..cc18ce7 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java @@ -154,8 +154,8 @@ public class TestQueryDisplay { Assert.assertTrue(qDisplay1.getPerfLogStarts(QueryDisplay.Phase.COMPILATION).size() > 0); Assert.assertTrue(qDisplay1.getPerfLogEnds(QueryDisplay.Phase.COMPILATION).size() > 0); - Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 2); - QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(1); + Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 1); + QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(0); Assert.assertEquals(tInfo1.getTaskId(), "Stage-0"); Assert.assertEquals(tInfo1.getTaskType(), StageType.DDL); Assert.assertTrue(tInfo1.getBeginTime() > 0 && tInfo1.getBeginTime() <= System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index dad43fb..32d2cb2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -450,7 +450,7 @@ public class Driver implements CommandProcessor { schema = getSchema(sem, conf); plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId, - queryState.getHiveOperation(), schema, queryDisplay); + queryState.getHiveOperation(), schema); conf.setQueryString(queryStr); @@ -1507,7 +1507,7 @@ public class Driver implements CommandProcessor { perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName()); } } - + setQueryDisplays(plan.getRootTasks()); int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size(); int jobs = mrJobs + Utilities.getTezTasks(plan.getRootTasks()).size() @@ -1739,6 +1739,15 @@ public class Driver implements CommandProcessor { return (0); } + private void setQueryDisplays(List<Task<? extends Serializable>> tasks) { + if (tasks != null) { + for (Task<? extends Serializable> task : tasks) { + task.setQueryDisplay(queryDisplay); + setQueryDisplays(task.getDependentTasks()); + } + } + } + private void logMrWarning(int mrJobs) { if (mrJobs <= 0 || !("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE)))) { return; http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java index d582bc0..703e997 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java @@ -79,6 +79,8 @@ public class QueryDisplay { private String name; private boolean requireLock; private boolean retryIfFail; + private String statusMessage; + // required for jackson public TaskDisplay() { @@ -158,15 +160,28 @@ public class QueryDisplay { if (externalHandle == null && tTask.getExternalHandle() != null) { this.externalHandle = tTask.getExternalHandle(); } + setStatusMessage(tTask.getStatusMessage()); switch (taskState) { case RUNNING: - beginTime = System.currentTimeMillis(); + if (beginTime == null) { + beginTime = System.currentTimeMillis(); + } break; case FINISHED: - endTime = System.currentTimeMillis(); + if (endTime == null) { + endTime = System.currentTimeMillis(); + } break; } } + + public synchronized String getStatusMessage() { + return statusMessage; + } + + public synchronized void setStatusMessage(String statusMessage) { + this.statusMessage = statusMessage; + } } public synchronized void setTaskResult(String taskId, TaskResult result) { TaskDisplay taskDisplay = tasks.get(taskId); http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java index ef0923d..e8c8ae6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java @@ -113,26 +113,12 @@ public class QueryPlan implements Serializable { } public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId, - HiveOperation operation, Schema resultSchema) { - this(queryString, sem, startTime, queryId, operation, resultSchema, null); - } - public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId, - HiveOperation operation, Schema resultSchema, QueryDisplay queryDisplay) { + HiveOperation operation, Schema resultSchema) { this.queryString = queryString; rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks()); reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>(); fetchTask = sem.getFetchTask(); - if (queryDisplay != null) { - if (fetchTask != null) { - fetchTask.setQueryDisplay(queryDisplay); - } - if (rootTasks!= null) { - for (Task t : rootTasks) { - t.setQueryDisplay(queryDisplay); - } - } - } // Note that inputs and outputs can be changed when the query gets executed inputs = sem.getAllInputs(); outputs = sem.getAllOutputs(); http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java index c96c813..52cb445 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java @@ -200,6 +200,7 @@ public class ConditionalTask extends Task<ConditionalWork> implements Serializab public boolean addDependentTask(Task<? extends Serializable> dependent) { boolean ret = false; if (getListTasks() != null) { + ret = true; for (Task<? extends Serializable> tsk : getListTasks()) { ret = ret & tsk.addDependentTask(dependent); } http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java index 897af5e..34bdafd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java @@ -83,8 +83,18 @@ public abstract class Task<T extends Serializable> implements Serializable, Node protected String id; protected T work; private TaskState taskState = TaskState.CREATED; + private String statusMessage; private transient boolean fetchSource; + public void setStatusMessage(String statusMessage) { + this.statusMessage = statusMessage; + updateStatusInQueryDisplay(); + } + + public String getStatusMessage() { + return statusMessage; + } + public enum FeedType { DYNAMIC_PARTITIONS, // list of dynamic partitions } @@ -138,13 +148,13 @@ public abstract class Task<T extends Serializable> implements Serializable, Node this.queryDisplay = queryDisplay; } - private void updateStatusInQueryDisplay() { + protected void updateStatusInQueryDisplay() { if (queryDisplay != null) { queryDisplay.updateTaskStatus(this); } } - private void setState(TaskState state) { + protected void setState(TaskState state) { this.taskState = state; updateStatusInQueryDisplay(); } http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 639b0da..926f6e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -432,7 +432,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop // Finally SUBMIT the JOB! rj = jc.submitJob(job); this.jobID = rj.getJobID(); - + updateStatusInQueryDisplay(); returnVal = jobExecHelper.progress(rj, jc); success = (returnVal == 0); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java index 760ba6c..11f5cfd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java @@ -367,12 +367,14 @@ public class HadoopJobExecHelper { } } console.printInfo(output); + task.setStatusMessage(output); reportTime = System.currentTimeMillis(); } if (cpuMsec > 0) { - console.printInfo("MapReduce Total cumulative CPU time: " - + Utilities.formatMsecToStr(cpuMsec)); + String status = "MapReduce Total cumulative CPU time: " + Utilities.formatMsecToStr(cpuMsec); + console.printInfo(status); + task.setStatusMessage(status); } boolean success; http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index ff7e9a4..fb8ee4c 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -641,7 +641,8 @@ public abstract class CLIServiceTest { SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay); assertNotNull(sessionHandle); // nonblocking execute - String select = "SELECT ID + ' ' FROM TEST_EXEC_ASYNC"; + String select = "select a.id, b.id from (SELECT ID + ' ' `ID` FROM TEST_EXEC_ASYNC) a full outer join " + + "(SELECT ID + ' ' `ID` FROM TEST_EXEC_ASYNC) b on a.ID=b.ID"; OperationHandle ophandle = client.executeStatementAsync(sessionHandle, select, confOverlay); @@ -697,6 +698,7 @@ public abstract class CLIServiceTest { case FINISHED: if (taskDisplay.getTaskType() == StageType.MAPRED || taskDisplay.getTaskType() == StageType.MAPREDLOCAL) { assertNotNull(taskDisplay.getExternalHandle()); + assertNotNull(taskDisplay.getStatusMessage()); } assertNotNull(taskDisplay.getBeginTime()); assertNotNull(taskDisplay.getEndTime());