Repository: hive
Updated Branches:
  refs/heads/branch-1 21d6578f2 -> e4937c033


HIVE-11997 - Add ability to send Compaction Jobs to specific queue (Eugene 
Koifman, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a4ff7adc
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a4ff7adc
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a4ff7adc

Branch: refs/heads/branch-1
Commit: a4ff7adce6674f356b98ae93427459247b1a6767
Parents: 21d6578
Author: Eugene Koifman <[email protected]>
Authored: Fri Oct 2 10:43:48 2015 -0700
Committer: Eugene Koifman <[email protected]>
Committed: Fri Oct 2 10:43:48 2015 -0700

----------------------------------------------------------------------
 common/src/java/org/apache/hadoop/hive/conf/HiveConf.java |  2 ++
 .../apache/hadoop/hive/ql/txn/compactor/CompactorMR.java  | 10 +++++++++-
 2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a4ff7adc/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 10c4548..fbbbcaa 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1507,6 +1507,8 @@ public class HiveConf extends Configuration {
 
     HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", 
"5000ms",
         new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the 
cleaner thread"),
+    COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name 
of Hadoop queue to which\n" +
+      "Compaction jobs will be submitted.  Set to empty string to let Hadoop 
choose the queue."),
     HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s",
       new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run 
after metastore start"),
     HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", 
"180s",

http://git-wip-us.apache.org/repos/asf/hive/blob/a4ff7adc/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 8e431b2..7bc01d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -116,6 +116,11 @@ public class CompactorMR {
     job.setInputFormat(CompactorInputFormat.class);
     job.setOutputFormat(NullOutputFormat.class);
     job.setOutputCommitter(CompactorOutputCommitter.class);
+    
+    String queueName = conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE);
+    if(queueName != null && queueName.length() > 0) {
+      job.setQueueName(queueName);
+    }
 
     job.set(FINAL_LOCATION, sd.getLocation());
     job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + 
UUID.randomUUID().toString());
@@ -186,7 +191,10 @@ public class CompactorMR {
     LOG.debug("Setting maximume transaction to " + maxTxn);
 
     RunningJob rj = JobClient.runJob(job);
-    LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : 
CompactionType.MINOR) + " compaction job '" + jobName + "' with jobID=" + 
rj.getID());
+    LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : 
CompactionType.MINOR) + " compaction job '" +
+      jobName + "' with jobID=" + rj.getID() + " to " + job.getQueueName() + " 
queue.  " +
+      "(current delta dirs count=" + dir.getCurrentDirectories().size() +
+      ", obsolete delta dirs count=" + dir.getObsolete());
     rj.waitForCompletion();
     su.gatherStats();
   }

Reply via email to