Repository: hive Updated Branches: refs/heads/master ff9822eb3 -> a1bac802a
HIVE-11997 - Add ability to send Compaction Jobs to specific queue (Eugene Koifman, reviewed by Jason Dere) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b1eb0c0f Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b1eb0c0f Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b1eb0c0f Branch: refs/heads/master Commit: b1eb0c0f1c3fc0f503bc675281c8be8356d1f081 Parents: ff9822e Author: Eugene Koifman <[email protected]> Authored: Fri Oct 2 10:11:00 2015 -0700 Committer: Eugene Koifman <[email protected]> Committed: Fri Oct 2 10:11:00 2015 -0700 ---------------------------------------------------------------------- common/src/java/org/apache/hadoop/hive/conf/HiveConf.java | 2 ++ .../apache/hadoop/hive/ql/txn/compactor/CompactorMR.java | 10 +++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b1eb0c0f/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index dffdb5c..e7ed07e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1571,6 +1571,8 @@ public class HiveConf extends Configuration { HIVE_COMPACTOR_CLEANER_RUN_INTERVAL("hive.compactor.cleaner.run.interval", "5000ms", new TimeValidator(TimeUnit.MILLISECONDS), "Time between runs of the cleaner thread"), + COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" + + "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."), HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s", new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"), HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s", http://git-wip-us.apache.org/repos/asf/hive/blob/b1eb0c0f/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 02fa725..3ee9346 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -117,6 +117,11 @@ public class CompactorMR { job.setInputFormat(CompactorInputFormat.class); job.setOutputFormat(NullOutputFormat.class); job.setOutputCommitter(CompactorOutputCommitter.class); + + String queueName = conf.getVar(HiveConf.ConfVars.COMPACTOR_JOB_QUEUE); + if(queueName != null && queueName.length() > 0) { + job.setQueueName(queueName); + } job.set(FINAL_LOCATION, sd.getLocation()); job.set(TMP_LOCATION, sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString()); @@ -189,7 +194,10 @@ public class CompactorMR { LOG.debug("Setting maximume transaction to " + maxTxn); RunningJob rj = JobClient.runJob(job); - LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" + jobName + "' with jobID=" + rj.getID()); + LOG.info("Submitted " + (isMajor ? CompactionType.MAJOR : CompactionType.MINOR) + " compaction job '" + + jobName + "' with jobID=" + rj.getID() + " to " + job.getQueueName() + " queue. " + + "(current delta dirs count=" + dir.getCurrentDirectories().size() + + ", obsolete delta dirs count=" + dir.getObsolete()); rj.waitForCompletion(); su.gatherStats(); }
