Author: hashutosh
Date: Thu Mar 13 22:47:37 2014
New Revision: 1577361
URL: http://svn.apache.org/r1577361
Log:
HIVE-6609 : Doing Ctrl-C on hive cli doesn't kill running MR jobs on hadoop-2
(Ashutosh Chauhan via Jason Dere)
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL:
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
---
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
(original)
+++
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
Thu Mar 13 22:47:37 2014
@@ -182,6 +182,7 @@ public class ExecDriver extends Task<Map
*
* @return true if fatal errors happened during job execution, false
otherwise.
*/
+ @Override
public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
Counters.Counter cntr = ctrs.findCounter(
HiveConf.getVar(job, HiveConf.ConfVars.HIVECOUNTERGROUP),
@@ -450,7 +451,7 @@ public class ExecDriver extends Task<Map
if (returnVal != 0) {
rj.killJob();
}
- HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+ HadoopJobExecHelper.runningJobs.remove(rj);
jobID = rj.getID().toString();
}
} catch (Exception e) {
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL:
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
---
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
(original)
+++
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
Thu Mar 13 22:47:37 2014
@@ -26,6 +26,7 @@ import java.util.Calendar;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -50,6 +51,7 @@ import org.apache.hadoop.mapred.Counters
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
@@ -67,9 +69,9 @@ public class HadoopJobExecHelper {
protected transient int mapProgress = 0;
protected transient int reduceProgress = 0;
- public transient String jobId;
- private LogHelper console;
- private HadoopJobExecHook callBackObj;
+ public transient JobID jobId;
+ private final LogHelper console;
+ private final HadoopJobExecHook callBackObj;
/**
* Update counters relevant to this task.
@@ -89,7 +91,7 @@ public class HadoopJobExecHelper {
* @param jobId
* @return
*/
- private static String getJobStartMsg(String jobId) {
+ private static String getJobStartMsg(JobID jobId) {
return "Starting Job = " + jobId;
}
@@ -99,7 +101,7 @@ public class HadoopJobExecHelper {
* @param jobId
* @return the job end message
*/
- public static String getJobEndMsg(String jobId) {
+ public static String getJobEndMsg(JobID jobId) {
return "Ended Job = " + jobId;
}
@@ -120,11 +122,11 @@ public class HadoopJobExecHelper {
}
- public String getJobId() {
+ public JobID getJobId() {
return jobId;
}
- public void setJobId(String jobId) {
+ public void setJobId(JobID jobId) {
this.jobId = jobId;
}
@@ -148,8 +150,8 @@ public class HadoopJobExecHelper {
* running jobs in the event of an unexpected shutdown - i.e., the JVM shuts
down while there are
* still jobs running.
*/
- public static Map<String, String> runningJobKillURIs = Collections
- .synchronizedMap(new HashMap<String, String>());
+ public static List<RunningJob> runningJobs = Collections
+ .synchronizedList(new LinkedList<RunningJob>());
/**
@@ -161,32 +163,23 @@ public class HadoopJobExecHelper {
*
*/
static {
- if (new org.apache.hadoop.conf.Configuration()
- .getBoolean("webinterface.private.actions", false)) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
killRunningJobs();
}
});
- }
}
public static void killRunningJobs() {
- synchronized (runningJobKillURIs) {
- for (String uri : runningJobKillURIs.values()) {
+ synchronized (runningJobs) {
+ for (RunningJob rj : runningJobs) {
try {
- System.err.println("killing job with: " + uri);
- java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new
java.net.URL(uri)
- .openConnection();
- conn.setRequestMethod("POST");
- int retCode = conn.getResponseCode();
- if (retCode != 200) {
- System.err.println("Got an error trying to kill job with URI: " +
uri + " = "
- + retCode);
- }
+ System.err.println("killing job with: " + rj.getID());
+ rj.killJob();
} catch (Exception e) {
- System.err.println("trying to kill job, caught: " + e);
+ LOG.warn(e);
+ System.err.println("Failed to kill job: "+ rj.getID());
// do nothing
}
}
@@ -252,7 +245,7 @@ public class HadoopJobExecHelper {
String logMapper;
String logReducer;
- TaskReport[] mappers = jc.getMapTaskReports(rj.getJobID());
+ TaskReport[] mappers = jc.getMapTaskReports(rj.getID());
if (mappers == null) {
logMapper = "no information for number of mappers; ";
} else {
@@ -264,7 +257,7 @@ public class HadoopJobExecHelper {
logMapper = "number of mappers: " + numMap + "; ";
}
- TaskReport[] reducers = jc.getReduceTaskReports(rj.getJobID());
+ TaskReport[] reducers = jc.getReduceTaskReports(rj.getID());
if (reducers == null) {
logReducer = "no information for number of reducers. ";
} else {
@@ -281,13 +274,13 @@ public class HadoopJobExecHelper {
initOutputPrinted = true;
}
- RunningJob newRj = jc.getJob(rj.getJobID());
+ RunningJob newRj = jc.getJob(rj.getID());
if (newRj == null) {
// under exceptional load, hadoop may not be able to look up status
// of finished jobs (because it has purged them from memory). From
// hive's perspective - it's equivalent to the job having failed.
// So raise a meaningful exception
- throw new IOException("Could not find status of job:" + rj.getJobID());
+ throw new IOException("Could not find status of job:" + rj.getID());
} else {
th.setRunningJob(newRj);
rj = newRj;
@@ -428,12 +421,12 @@ public class HadoopJobExecHelper {
} else {
if (SessionState.get() != null) {
SessionState.get().getHiveHistory().setTaskProperty(SessionState.get().getQueryId(),
- getId(), Keys.TASK_HADOOP_ID, rj.getJobID());
+ getId(), Keys.TASK_HADOOP_ID, rj.getID().toString());
}
- console.printInfo(getJobStartMsg(rj.getJobID()) + ", Tracking URL = "
+ console.printInfo(getJobStartMsg(rj.getID()) + ", Tracking URL = "
+ rj.getTrackingURL());
console.printInfo("Kill Command = " + HiveConf.getVar(job,
HiveConf.ConfVars.HADOOPBIN)
- + " job -kill " + rj.getJobID());
+ + " job -kill " + rj.getID());
}
}
@@ -509,7 +502,7 @@ public class HadoopJobExecHelper {
public int progress(RunningJob rj, JobClient jc) throws IOException {
- jobId = rj.getJobID();
+ jobId = rj.getID();
int returnVal = 0;
@@ -527,7 +520,7 @@ public class HadoopJobExecHelper {
// add to list of running jobs to kill in case of abnormal shutdown
- runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() +
"&action=kill");
+ runningJobs.add(rj);
ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
jobInfo(rj);
@@ -548,7 +541,7 @@ public class HadoopJobExecHelper {
boolean success = mapRedStats.isSuccess();
- String statusMesg = getJobEndMsg(rj.getJobID());
+ String statusMesg = getJobEndMsg(rj.getID());
if (!success) {
statusMesg += " with errors";
returnVal = 2;
@@ -592,8 +585,7 @@ public class HadoopJobExecHelper {
}
}
// Compute the reducers run time statistics for the job
- ReducerTimeStatsPerJob reducerTimeStatsPerJob = new
ReducerTimeStatsPerJob(reducersRunTimes,
- new String(this.jobId));
+ ReducerTimeStatsPerJob reducerTimeStatsPerJob = new
ReducerTimeStatsPerJob(reducersRunTimes);
// Adding the reducers run time statistics for the job in the QueryPlan
this.task.getQueryPlan().getReducerTimeStatsPerJobList().add(reducerTimeStatsPerJob);
return;
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL:
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
---
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
(original)
+++
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
Thu Mar 13 22:47:37 2014
@@ -241,7 +241,7 @@ public class BlockMergeTask extends Task
if (returnVal != 0) {
rj.killJob();
}
- HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+ HadoopJobExecHelper.runningJobs.remove(rj);
jobID = rj.getID().toString();
}
RCFileMergeMapper.jobClose(outputPath, success, job, console,
@@ -372,5 +372,5 @@ public class BlockMergeTask extends Task
@Override
public void logPlanProgress(SessionState ss) throws IOException {
// no op
- }
+ }
}
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
URL:
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
---
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
(original)
+++
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
Thu Mar 13 22:47:37 2014
@@ -248,7 +248,7 @@ public class PartialScanTask extends Tas
if (returnVal != 0) {
rj.killJob();
}
- HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+ HadoopJobExecHelper.runningJobs.remove(rj);
jobID = rj.getID().toString();
}
} catch (Exception e) {
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
URL:
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
---
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
(original)
+++
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
Thu Mar 13 22:47:37 2014
@@ -217,7 +217,7 @@ public class ColumnTruncateTask extends
if (returnVal != 0) {
rj.killJob();
}
- HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+ HadoopJobExecHelper.runningJobs.remove(rj);
jobID = rj.getID().toString();
}
ColumnTruncateMapper.jobClose(outputPath, success, job, console,
Modified:
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
URL:
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
---
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
(original)
+++
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
Thu Mar 13 22:47:37 2014
@@ -32,9 +32,6 @@ import java.util.List;
*/
public class ReducerTimeStatsPerJob {
- // stores the JobId of the job
- private final String jobId;
-
// Stores the temporal statistics in milliseconds for reducers
// specific to a Job
private final long minimumTime;
@@ -47,8 +44,7 @@ public class ReducerTimeStatsPerJob {
* Computes the temporal run time statistics of the reducers
* for a specific JobId.
*/
- public ReducerTimeStatsPerJob(List<Integer> reducersRunTimes, String jobId) {
- this.jobId = jobId;
+ public ReducerTimeStatsPerJob(List<Integer> reducersRunTimes) {
// If no Run times present, then set -1, indicating no values
if (!reducersRunTimes.isEmpty()) {
@@ -103,9 +99,4 @@ public class ReducerTimeStatsPerJob {
public double getStandardDeviationTime() {
return this.standardDeviationTime;
}
-
- public String getJobId() {
- return this.jobId;
- }
-
}