Repository: hive Updated Branches: refs/heads/master 187829fa9 -> 8d22a60c8
HIVE-12556 : Ctrl-C in beeline doesn't kill Tez query on HS2 (Sergey Shelukhin, reviewed by Gunther Hagleitner) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8d22a60c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8d22a60c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8d22a60c Branch: refs/heads/master Commit: 8d22a60c8c4a247aa396b8c3841b6ebdce51f508 Parents: 187829f Author: Sergey Shelukhin <[email protected]> Authored: Wed Dec 2 16:16:35 2015 -0800 Committer: Sergey Shelukhin <[email protected]> Committed: Wed Dec 2 16:16:59 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hive/ql/Driver.java | 5 + .../hadoop/hive/ql/exec/tez/TezJobMonitor.java | 21 +++- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 109 ++++++++++++++++++- 3 files changed, 124 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index 8fafd61..62b608c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -1901,6 +1901,11 @@ public class Driver implements CommandProcessor { public int close() { try { + try { + releaseResources(); + } catch (Exception e) { + LOG.info("Exception while releasing resources", e); + } if (fetchTask != null) { try { fetchTask.clearFetch(); http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java index 59e9d29..f6bc19c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezJobMonitor.java @@ -22,6 +22,7 @@ import static org.apache.tez.dag.api.client.DAGStatus.State.RUNNING; import static org.fusesource.jansi.Ansi.ansi; import java.io.IOException; +import java.io.InterruptedIOException; import java.io.PrintStream; import java.text.DecimalFormat; import java.text.NumberFormat; @@ -135,9 +136,7 @@ public class TezJobMonitor { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - for (DAGClient c: shutdownList) { - TezJobMonitor.killRunningJobs(); - } + TezJobMonitor.killRunningJobs(); try { for (TezSessionState s : TezSessionPoolManager.getInstance().getOpenSessions()) { System.err.println("Shutting down tez session."); @@ -346,8 +345,8 @@ public class TezJobMonitor { } } catch (Exception e) { console.printInfo("Exception: " + e.getMessage()); - if (++failedCounter % maxRetryInterval / checkInterval == 0 - || e instanceof InterruptedException) { + boolean isInterrupted = hasInterruptedException(e); + if (isInterrupted || (++failedCounter % maxRetryInterval / checkInterval == 0)) { try { console.printInfo("Killing DAG..."); dagClient.tryKillDAG(); @@ -376,10 +375,22 @@ public class TezJobMonitor { } } } + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_RUN_DAG); return rc; } + private static boolean hasInterruptedException(Throwable e) { + // Hadoop IPC wraps InterruptedException. GRRR. + while (e != null) { + if (e instanceof InterruptedException || e instanceof InterruptedIOException) { + return true; + } + e = e.getCause(); + } + return false; + } + /** * killRunningJobs tries to terminate execution of all * currently running tez queries. No guarantees, best effort only. http://git-wip-us.apache.org/repos/asf/hive/blob/8d22a60c/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index a6d911d..a2060da 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -29,6 +30,9 @@ import java.util.List; import java.util.Map; import java.util.Set; +import javax.annotation.Nullable; + +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -52,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.tez.client.CallerContext; @@ -64,10 +69,13 @@ import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; import org.apache.tez.dag.api.GroupInputEdge; import org.apache.tez.dag.api.SessionNotRunning; +import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.StatusGetOpts; +import org.apache.tez.dag.api.client.VertexStatus; import org.json.JSONObject; /** @@ -86,6 +94,8 @@ public class TezTask extends Task<TezWork> { private final DagUtils utils; + private DAGClient dagClient = null; + Map<BaseWork, Vertex> workToVertex = new HashMap<BaseWork, Vertex>(); Map<BaseWork, JobConf> workToConf = new HashMap<BaseWork, JobConf>(); @@ -107,7 +117,6 @@ public class TezTask extends Task<TezWork> { int rc = 1; boolean cleanContext = false; Context ctx = null; - DAGClient client = null; TezSessionState session = null; try { @@ -177,12 +186,12 @@ public class TezTask extends Task<TezWork> { addExtraResourcesToDag(session, dag, inputOutputJars, inputOutputLocalResources); // submit will send the job to the cluster and start executing - client = submit(jobConf, dag, scratchDir, appJarLr, session, + dagClient = submit(jobConf, dag, scratchDir, appJarLr, session, additionalLr, inputOutputJars, inputOutputLocalResources); // finally monitor will print progress until the job is done TezJobMonitor monitor = new TezJobMonitor(work.getWorkMap()); - rc = monitor.monitorExecution(client, ctx.getHiveTxnManager(), conf, dag); + rc = monitor.monitorExecution(dagClient, ctx.getHiveTxnManager(), conf, dag); if (rc != 0) { this.setException(new HiveException(monitor.getDiagnostics())); } @@ -190,7 +199,7 @@ public class TezTask extends Task<TezWork> { // fetch the counters try { Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); - counters = client.getDAGStatus(statusGetOpts).getDAGCounters(); + counters = dagClient.getDAGStatus(statusGetOpts).getDAGCounters(); } catch (Exception err) { // Don't fail execution due to counters - just don't print summary info LOG.error("Failed to get counters: " + err, err); @@ -231,7 +240,7 @@ public class TezTask extends Task<TezWork> { } } // need to either move tmp files or remove them - if (client != null) { + if (dagClient != null) { // rc will only be overwritten if close errors out rc = close(work, rc); } @@ -462,7 +471,7 @@ public class TezTask extends Task<TezWork> { } perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_SUBMIT_DAG); - return dagClient; + return new SyncDagClient(dagClient); } /* @@ -544,4 +553,92 @@ public class TezTask extends Task<TezWork> { return ((ReduceWork)children.get(0)).getReducer(); } + + @Override + public void shutdown() { + super.shutdown(); + if (dagClient != null) { + LOG.info("Shutting down Tez task " + this); + try { + dagClient.tryKillDAG(); + LOG.info("Waiting for Tez task to shut down: " + this); + dagClient.waitForCompletion(); + } catch (Exception ex) { + LOG.info("Failed to shut down TezTask" + this, ex); + } + } + } + + /** DAG client that does dumb global sync on all the method calls; + * Tez DAG client is not thread safe and getting the 2nd one is not recommended. */ + public class SyncDagClient extends DAGClient { + private final DAGClient dagClient; + + public SyncDagClient(DAGClient dagClient) { + super(); + this.dagClient = dagClient; + } + + @Override + public void close() throws IOException { + dagClient.close(); // Don't sync. + } + + @Override + public String getExecutionContext() { + return dagClient.getExecutionContext(); // Don't sync. + } + + @Override + @Private + protected ApplicationReport getApplicationReportInternal() { + throw new UnsupportedOperationException(); // The method is not exposed, and we don't use it. + } + + @Override + public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions) + throws IOException, TezException { + synchronized (dagClient) { + return dagClient.getDAGStatus(statusOptions); + } + } + + @Override + public DAGStatus getDAGStatus(@Nullable Set<StatusGetOpts> statusOptions, + long timeout) throws IOException, TezException { + synchronized (dagClient) { + return dagClient.getDAGStatus(statusOptions, timeout); + } + } + + @Override + public VertexStatus getVertexStatus(String vertexName, + Set<StatusGetOpts> statusOptions) throws IOException, TezException { + synchronized (dagClient) { + return dagClient.getVertexStatus(vertexName, statusOptions); + } + } + + @Override + public void tryKillDAG() throws IOException, TezException { + synchronized (dagClient) { + dagClient.tryKillDAG(); + } + } + + @Override + public DAGStatus waitForCompletion() throws IOException, TezException, InterruptedException { + synchronized (dagClient) { + return dagClient.waitForCompletion(); + } + } + + @Override + public DAGStatus waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts> statusGetOpts) + throws IOException, TezException, InterruptedException { + synchronized (dagClient) { + return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts); + } + } + } }
