Repository: hive Updated Branches: refs/heads/branch-1.2 82a7c82ed -> fed9a0b5f
HIVE-8915 Log file explosion due to non-existence of COMPACTION_QUEUE table (Alan Gates, reviewed by Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fed9a0b5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fed9a0b5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fed9a0b5 Branch: refs/heads/branch-1.2 Commit: fed9a0b5fb92f1a03870340f0f190e557cc37202 Parents: 82a7c82 Author: Alan Gates <ga...@hortonworks.com> Authored: Tue May 5 09:19:53 2015 -0700 Committer: Alan Gates <ga...@hortonworks.com> Committed: Tue May 5 09:19:53 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 18 ++++++++---- .../hadoop/hive/ql/txn/compactor/Worker.java | 30 ++++++++++++++------ 2 files changed, 33 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/fed9a0b5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 0fb39f7..83b0d3d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -70,10 +70,10 @@ public class Cleaner extends CompactorThread { // and if so remembers that and then sets it to true at the end. We have to check here // first to make sure we go through a complete iteration of the loop before resetting it. boolean setLooped = !looped.get(); + long startedAt = System.currentTimeMillis(); // Make sure nothing escapes this run method and kills the metastore at large, // so wrap it in a big catch Throwable statement. try { - long startedAt = System.currentTimeMillis(); // First look for all the compactions that are waiting to be cleaned. If we have not // seen an entry before, look for all the locks held on that table or partition and @@ -134,11 +134,6 @@ public class Cleaner extends CompactorThread { } } } - - // Now, go back to bed until it's time to do this again - long elapsedTime = System.currentTimeMillis() - startedAt; - if (elapsedTime >= cleanerCheckInterval || stop.get()) continue; - else Thread.sleep(cleanerCheckInterval - elapsedTime); } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor cleaner, " + StringUtils.stringifyException(t)); @@ -146,6 +141,17 @@ public class Cleaner extends CompactorThread { if (setLooped) { looped.set(true); } + // Now, go back to bed until it's time to do this again + long elapsedTime = System.currentTimeMillis() - startedAt; + if (elapsedTime >= cleanerCheckInterval || stop.get()) { + continue; + } else { + try { + Thread.sleep(cleanerCheckInterval - elapsedTime); + } catch (InterruptedException ie) { + // What can I do about it? + } + } } while (!stop.get()); } http://git-wip-us.apache.org/repos/asf/hive/blob/fed9a0b5/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 8cfa37e..3ce9ffd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -73,10 +73,11 @@ public class Worker extends CompactorThread { @Override public void run() { - // Make sure nothing escapes this run method and kills the metastore at large, - // so wrap it in a big catch Throwable statement. - try { - do { + do { + boolean launchedJob = false; + // Make sure nothing escapes this run method and kills the metastore at large, + // so wrap it in a big catch Throwable statement. + try { CompactionInfo ci = txnHandler.findNextToCompact(name); if (ci == null && !stop.get()) { @@ -143,6 +144,7 @@ public class Worker extends CompactorThread { final StatsUpdater su = StatsUpdater.init(ci, txnHandler.findColumnsWithStats(ci), conf, runJobAsSelf(runAs) ? runAs : t.getOwner()); final CompactorMR mr = new CompactorMR(); + launchedJob = true; try { if (runJobAsSelf(runAs)) { mr.run(conf, jobName.toString(), t, sd, txns, isMajor, su); @@ -163,11 +165,21 @@ public class Worker extends CompactorThread { ". Marking clean to avoid repeated failures, " + StringUtils.stringifyException(e)); txnHandler.markCleaned(ci); } - } while (!stop.get()); - } catch (Throwable t) { - LOG.error("Caught an exception in the main loop of compactor worker " + name + - ", exiting " + StringUtils.stringifyException(t)); - } + } catch (Throwable t) { + LOG.error("Caught an exception in the main loop of compactor worker " + name + ", " + + StringUtils.stringifyException(t)); + } + + // If we didn't try to launch a job it either means there was no work to do or we got + // here as the result of a communication failure with the DB. Either way we want to wait + // a bit before we restart the loop. + if (!launchedJob && !stop.get()) { + try { + Thread.sleep(SLEEP_TIME); + } catch (InterruptedException e) { + } + } + } while (!stop.get()); } @Override