Repository: hadoop Updated Branches: refs/heads/trunk e9c37de48 -> f4357240a
YARN-2608. FairScheduler: Potential deadlocks in loading alloc files and clock access. (Wei Yan via kasha) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f4357240 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f4357240 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f4357240 Branch: refs/heads/trunk Commit: f4357240a6f81065d91d5f443ed8fc8cd2a14a8f Parents: e9c37de Author: Karthik Kambatla <[email protected]> Authored: Thu Sep 25 17:42:47 2014 -0700 Committer: Karthik Kambatla <[email protected]> Committed: Thu Sep 25 17:42:51 2014 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FairScheduler.java | 122 ++++++++++--------- 2 files changed, 65 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4357240/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index bbda48d..d0cbcd2 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -450,6 +450,9 @@ Release 2.6.0 - UNRELEASED YARN-2523. ResourceManager UI showing negative value for "Decommissioned Nodes" field (Rohith via jlowe) + YARN-2608. FairScheduler: Potential deadlocks in loading alloc files and + clock access. (Wei Yan via kasha) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/f4357240/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 296d884..d633981 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -117,7 +117,7 @@ public class FairScheduler extends private Resource incrAllocation; private QueueManager queueMgr; - private Clock clock; + private volatile Clock clock; private boolean usePortForNodeName; private static final Log LOG = LogFactory.getLog(FairScheduler.class); @@ -555,11 +555,12 @@ public class FairScheduler extends return continuousSchedulingSleepMs; } - public synchronized Clock getClock() { + public Clock getClock() { return clock; } - protected synchronized void setClock(Clock clock) { + @VisibleForTesting + void setClock(Clock clock) { this.clock = clock; } @@ -1204,64 +1205,65 @@ public class FairScheduler extends this.rmContext = rmContext; } - private synchronized void initScheduler(Configuration conf) - throws IOException { - this.conf = new FairSchedulerConfiguration(conf); - validateConf(this.conf); - minimumAllocation = this.conf.getMinimumAllocation(); - maximumAllocation = this.conf.getMaximumAllocation(); - incrAllocation = this.conf.getIncrementAllocation(); - continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); - continuousSchedulingSleepMs = - this.conf.getContinuousSchedulingSleepMs(); - nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); - rackLocalityThreshold = this.conf.getLocalityThresholdRack(); - nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); - rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); - preemptionEnabled = this.conf.getPreemptionEnabled(); - preemptionUtilizationThreshold = - this.conf.getPreemptionUtilizationThreshold(); - assignMultiple = this.conf.getAssignMultiple(); - maxAssign = this.conf.getMaxAssign(); - sizeBasedWeight = this.conf.getSizeBasedWeight(); - preemptionInterval = this.conf.getPreemptionInterval(); - waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); - usePortForNodeName = this.conf.getUsePortForNodeName(); - - updateInterval = this.conf.getUpdateInterval(); - if (updateInterval < 0) { - updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS; - LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS - + " is invalid, so using default value " + - + FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS - + " ms instead"); - } - - rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); - fsOpDurations = FSOpDurations.getInstance(true); - - // This stores per-application scheduling information - this.applications = new ConcurrentHashMap< - ApplicationId, SchedulerApplication<FSAppAttempt>>(); - this.eventLog = new FairSchedulerEventLog(); - eventLog.init(this.conf); - - allocConf = new AllocationConfiguration(conf); - try { - queueMgr.initialize(conf); - } catch (Exception e) { - throw new IOException("Failed to start FairScheduler", e); - } + private void initScheduler(Configuration conf) throws IOException { + synchronized (this) { + this.conf = new FairSchedulerConfiguration(conf); + validateConf(this.conf); + minimumAllocation = this.conf.getMinimumAllocation(); + maximumAllocation = this.conf.getMaximumAllocation(); + incrAllocation = this.conf.getIncrementAllocation(); + continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); + continuousSchedulingSleepMs = + this.conf.getContinuousSchedulingSleepMs(); + nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); + rackLocalityThreshold = this.conf.getLocalityThresholdRack(); + nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs(); + rackLocalityDelayMs = this.conf.getLocalityDelayRackMs(); + preemptionEnabled = this.conf.getPreemptionEnabled(); + preemptionUtilizationThreshold = + this.conf.getPreemptionUtilizationThreshold(); + assignMultiple = this.conf.getAssignMultiple(); + maxAssign = this.conf.getMaxAssign(); + sizeBasedWeight = this.conf.getSizeBasedWeight(); + preemptionInterval = this.conf.getPreemptionInterval(); + waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill(); + usePortForNodeName = this.conf.getUsePortForNodeName(); + + updateInterval = this.conf.getUpdateInterval(); + if (updateInterval < 0) { + updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS; + LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS + + " is invalid, so using default value " + + +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS + + " ms instead"); + } - updateThread = new UpdateThread(); - updateThread.setName("FairSchedulerUpdateThread"); - updateThread.setDaemon(true); + rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf); + fsOpDurations = FSOpDurations.getInstance(true); - if (continuousSchedulingEnabled) { - // start continuous scheduling thread - schedulingThread = new ContinuousSchedulingThread(); - schedulingThread.setName("FairSchedulerContinuousScheduling"); - schedulingThread.setDaemon(true); + // This stores per-application scheduling information + this.applications = new ConcurrentHashMap< + ApplicationId, SchedulerApplication<FSAppAttempt>>(); + this.eventLog = new FairSchedulerEventLog(); + eventLog.init(this.conf); + + allocConf = new AllocationConfiguration(conf); + try { + queueMgr.initialize(conf); + } catch (Exception e) { + throw new IOException("Failed to start FairScheduler", e); + } + + updateThread = new UpdateThread(); + updateThread.setName("FairSchedulerUpdateThread"); + updateThread.setDaemon(true); + + if (continuousSchedulingEnabled) { + // start continuous scheduling thread + schedulingThread = new ContinuousSchedulingThread(); + schedulingThread.setName("FairSchedulerContinuousScheduling"); + schedulingThread.setDaemon(true); + } } allocsLoader.init(conf); @@ -1321,7 +1323,7 @@ public class FairScheduler extends } @Override - public synchronized void reinitialize(Configuration conf, RMContext rmContext) + public void reinitialize(Configuration conf, RMContext rmContext) throws IOException { try { allocsLoader.reloadAllocations();
