Author: hashutosh
Date: Thu Mar 13 22:47:37 2014
New Revision: 1577361

URL: http://svn.apache.org/r1577361
Log:
HIVE-6609 : Doing Ctrl-C on hive cli doesn't kill running MR jobs on hadoop-2 
(Ashutosh Chauhan via Jason Dere)

Modified:
    
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
    
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
    
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
    
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
    
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
    
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java

Modified: 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
 (original)
+++ 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
 Thu Mar 13 22:47:37 2014
@@ -182,6 +182,7 @@ public class ExecDriver extends Task<Map
    *
    * @return true if fatal errors happened during job execution, false 
otherwise.
    */
+  @Override
   public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
      Counters.Counter cntr = ctrs.findCounter(
         HiveConf.getVar(job, HiveConf.ConfVars.HIVECOUNTERGROUP),
@@ -450,7 +451,7 @@ public class ExecDriver extends Task<Map
           if (returnVal != 0) {
             rj.killJob();
           }
-          HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+          HadoopJobExecHelper.runningJobs.remove(rj);
           jobID = rj.getID().toString();
         }
       } catch (Exception e) {

Modified: 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
 (original)
+++ 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
 Thu Mar 13 22:47:37 2014
@@ -26,6 +26,7 @@ import java.util.Calendar;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -50,6 +51,7 @@ import org.apache.hadoop.mapred.Counters
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
@@ -67,9 +69,9 @@ public class HadoopJobExecHelper {
 
   protected transient int mapProgress = 0;
   protected transient int reduceProgress = 0;
-  public transient String jobId;
-  private LogHelper console;
-  private HadoopJobExecHook callBackObj;
+  public transient JobID jobId;
+  private final LogHelper console;
+  private final HadoopJobExecHook callBackObj;
 
   /**
    * Update counters relevant to this task.
@@ -89,7 +91,7 @@ public class HadoopJobExecHelper {
    * @param jobId
    * @return
    */
-  private static String getJobStartMsg(String jobId) {
+  private static String getJobStartMsg(JobID jobId) {
     return "Starting Job = " + jobId;
   }
 
@@ -99,7 +101,7 @@ public class HadoopJobExecHelper {
    * @param jobId
    * @return the job end message
    */
-  public static String getJobEndMsg(String jobId) {
+  public static String getJobEndMsg(JobID jobId) {
     return "Ended Job = " + jobId;
   }
 
@@ -120,11 +122,11 @@ public class HadoopJobExecHelper {
   }
 
 
-  public String getJobId() {
+  public JobID getJobId() {
     return jobId;
   }
 
-  public void setJobId(String jobId) {
+  public void setJobId(JobID jobId) {
     this.jobId = jobId;
   }
 
@@ -148,8 +150,8 @@ public class HadoopJobExecHelper {
    * running jobs in the event of an unexpected shutdown - i.e., the JVM shuts 
down while there are
    * still jobs running.
    */
-  public static Map<String, String> runningJobKillURIs = Collections
-      .synchronizedMap(new HashMap<String, String>());
+  public static List<RunningJob> runningJobs = Collections
+      .synchronizedList(new LinkedList<RunningJob>());
 
 
   /**
@@ -161,32 +163,23 @@ public class HadoopJobExecHelper {
    *
    */
   static {
-    if (new org.apache.hadoop.conf.Configuration()
-        .getBoolean("webinterface.private.actions", false)) {
       Runtime.getRuntime().addShutdownHook(new Thread() {
         @Override
         public void run() {
           killRunningJobs();
         }
       });
-    }
   }
 
   public static void killRunningJobs() {
-    synchronized (runningJobKillURIs) {
-      for (String uri : runningJobKillURIs.values()) {
+    synchronized (runningJobs) {
+      for (RunningJob rj : runningJobs) {
         try {
-          System.err.println("killing job with: " + uri);
-          java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new 
java.net.URL(uri)
-               .openConnection();
-          conn.setRequestMethod("POST");
-          int retCode = conn.getResponseCode();
-          if (retCode != 200) {
-            System.err.println("Got an error trying to kill job with URI: " + 
uri + " = "
-                + retCode);
-          }
+          System.err.println("killing job with: " + rj.getID());
+          rj.killJob();
         } catch (Exception e) {
-          System.err.println("trying to kill job, caught: " + e);
+          LOG.warn(e);
+          System.err.println("Failed to kill job: "+ rj.getID());
           // do nothing
         }
       }
@@ -252,7 +245,7 @@ public class HadoopJobExecHelper {
         String logMapper;
         String logReducer;
 
-        TaskReport[] mappers = jc.getMapTaskReports(rj.getJobID());
+        TaskReport[] mappers = jc.getMapTaskReports(rj.getID());
         if (mappers == null) {
           logMapper = "no information for number of mappers; ";
         } else {
@@ -264,7 +257,7 @@ public class HadoopJobExecHelper {
           logMapper = "number of mappers: " + numMap + "; ";
         }
 
-        TaskReport[] reducers = jc.getReduceTaskReports(rj.getJobID());
+        TaskReport[] reducers = jc.getReduceTaskReports(rj.getID());
         if (reducers == null) {
           logReducer = "no information for number of reducers. ";
         } else {
@@ -281,13 +274,13 @@ public class HadoopJobExecHelper {
         initOutputPrinted = true;
       }
 
-      RunningJob newRj = jc.getJob(rj.getJobID());
+      RunningJob newRj = jc.getJob(rj.getID());
       if (newRj == null) {
         // under exceptional load, hadoop may not be able to look up status
         // of finished jobs (because it has purged them from memory). From
         // hive's perspective - it's equivalent to the job having failed.
         // So raise a meaningful exception
-        throw new IOException("Could not find status of job:" + rj.getJobID());
+        throw new IOException("Could not find status of job:" + rj.getID());
       } else {
         th.setRunningJob(newRj);
         rj = newRj;
@@ -428,12 +421,12 @@ public class HadoopJobExecHelper {
     } else {
       if (SessionState.get() != null) {
         
SessionState.get().getHiveHistory().setTaskProperty(SessionState.get().getQueryId(),
-            getId(), Keys.TASK_HADOOP_ID, rj.getJobID());
+            getId(), Keys.TASK_HADOOP_ID, rj.getID().toString());
       }
-      console.printInfo(getJobStartMsg(rj.getJobID()) + ", Tracking URL = "
+      console.printInfo(getJobStartMsg(rj.getID()) + ", Tracking URL = "
           + rj.getTrackingURL());
       console.printInfo("Kill Command = " + HiveConf.getVar(job, 
HiveConf.ConfVars.HADOOPBIN)
-          + " job  -kill " + rj.getJobID());
+          + " job  -kill " + rj.getID());
     }
   }
 
@@ -509,7 +502,7 @@ public class HadoopJobExecHelper {
 
 
   public int progress(RunningJob rj, JobClient jc) throws IOException {
-    jobId = rj.getJobID();
+    jobId = rj.getID();
 
     int returnVal = 0;
 
@@ -527,7 +520,7 @@ public class HadoopJobExecHelper {
 
     // add to list of running jobs to kill in case of abnormal shutdown
 
-    runningJobKillURIs.put(rj.getJobID(), rj.getTrackingURL() + 
"&action=kill");
+    runningJobs.add(rj);
 
     ExecDriverTaskHandle th = new ExecDriverTaskHandle(jc, rj);
     jobInfo(rj);
@@ -548,7 +541,7 @@ public class HadoopJobExecHelper {
 
     boolean success = mapRedStats.isSuccess();
 
-    String statusMesg = getJobEndMsg(rj.getJobID());
+    String statusMesg = getJobEndMsg(rj.getID());
     if (!success) {
       statusMesg += " with errors";
       returnVal = 2;
@@ -592,8 +585,7 @@ public class HadoopJobExecHelper {
       }
     }
     // Compute the reducers run time statistics for the job
-    ReducerTimeStatsPerJob reducerTimeStatsPerJob = new 
ReducerTimeStatsPerJob(reducersRunTimes,
-        new String(this.jobId));
+    ReducerTimeStatsPerJob reducerTimeStatsPerJob = new 
ReducerTimeStatsPerJob(reducersRunTimes);
     // Adding the reducers run time statistics for the job in the QueryPlan
     
this.task.getQueryPlan().getReducerTimeStatsPerJobList().add(reducerTimeStatsPerJob);
     return;

Modified: 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
 (original)
+++ 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
 Thu Mar 13 22:47:37 2014
@@ -241,7 +241,7 @@ public class BlockMergeTask extends Task
           if (returnVal != 0) {
             rj.killJob();
           }
-          HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+          HadoopJobExecHelper.runningJobs.remove(rj);
           jobID = rj.getID().toString();
         }
         RCFileMergeMapper.jobClose(outputPath, success, job, console,
@@ -372,5 +372,5 @@ public class BlockMergeTask extends Task
   @Override
   public void logPlanProgress(SessionState ss) throws IOException {
     // no op
-  }  
+  }
 }

Modified: 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
 (original)
+++ 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/stats/PartialScanTask.java
 Thu Mar 13 22:47:37 2014
@@ -248,7 +248,7 @@ public class PartialScanTask extends Tas
           if (returnVal != 0) {
             rj.killJob();
           }
-          HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+          HadoopJobExecHelper.runningJobs.remove(rj);
           jobID = rj.getID().toString();
         }
       } catch (Exception e) {

Modified: 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
 (original)
+++ 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
 Thu Mar 13 22:47:37 2014
@@ -217,7 +217,7 @@ public class ColumnTruncateTask extends 
           if (returnVal != 0) {
             rj.killJob();
           }
-          HadoopJobExecHelper.runningJobKillURIs.remove(rj.getJobID());
+          HadoopJobExecHelper.runningJobs.remove(rj);
           jobID = rj.getID().toString();
         }
         ColumnTruncateMapper.jobClose(outputPath, success, job, console,

Modified: 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java?rev=1577361&r1=1577360&r2=1577361&view=diff
==============================================================================
--- 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
 (original)
+++ 
hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/ReducerTimeStatsPerJob.java
 Thu Mar 13 22:47:37 2014
@@ -32,9 +32,6 @@ import java.util.List;
  */
 public class ReducerTimeStatsPerJob {
 
-  // stores the JobId of the job
-  private final String jobId;
-
   // Stores the temporal statistics in milliseconds for reducers
   // specific to a Job
   private final long minimumTime;
@@ -47,8 +44,7 @@ public class ReducerTimeStatsPerJob {
    * Computes the temporal run time statistics of the reducers
    * for a specific JobId.
    */
-  public ReducerTimeStatsPerJob(List<Integer> reducersRunTimes, String jobId) {
-    this.jobId = jobId;
+  public ReducerTimeStatsPerJob(List<Integer> reducersRunTimes) {
 
     // If no Run times present, then set -1, indicating no values
     if (!reducersRunTimes.isEmpty()) {
@@ -103,9 +99,4 @@ public class ReducerTimeStatsPerJob {
   public double getStandardDeviationTime() {
     return this.standardDeviationTime;
   }
-
-  public String getJobId() {
-    return this.jobId;
-  }
-
 }


Reply via email to