tomicooler commented on a change in pull request #3581:
URL: https://github.com/apache/hadoop/pull/3581#discussion_r735354777



##########
File path: 
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
##########
@@ -302,110 +302,125 @@ void initScheduler(Configuration configuration) throws
       IOException, YarnException {
     writeLock.lock();
     try {
-      String confProviderStr = configuration.get(
-          YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
-          YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
-      switch (confProviderStr) {
-      case YarnConfiguration.FILE_CONFIGURATION_STORE:
-        this.csConfProvider =
-            new FileBasedCSConfigurationProvider(rmContext);
-        break;
-      case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
-      case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
-      case YarnConfiguration.ZK_CONFIGURATION_STORE:
-      case YarnConfiguration.FS_CONFIGURATION_STORE:
-        this.csConfProvider = new MutableCSConfigurationProvider(rmContext);
-        break;
-      default:
-        throw new IOException("Invalid configuration store class: " +
-            confProviderStr);
-      }
+      this.csConfProvider = getCsConfProvider(configuration);
       this.csConfProvider.init(configuration);
       this.conf = this.csConfProvider.loadConfiguration(configuration);
       validateConf(this.conf);
       this.minimumAllocation = super.getMinimumAllocation();
       initMaximumResourceCapability(super.getMaximumAllocation());
-      this.calculator = this.conf.getResourceCalculator();
-      if (this.calculator instanceof DefaultResourceCalculator
-          && ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
-        throw new YarnRuntimeException("RM uses DefaultResourceCalculator 
which"
-            + " used only memory as resource-type but invalid resource-types"
-            + " specified " + ResourceUtils.getResourceTypes() + ". Use"
-            + " DominantResourceCalculator instead to make effective use of"
-            + " these resource-types");
-      }
+      this.calculator = initResourceCalculator();
       this.usePortForNodeName = this.conf.getUsePortForNodeName();
       this.applications = new ConcurrentHashMap<>();
       this.labelManager = rmContext.getNodeLabelManager();
       this.appPriorityACLManager = new AppPriorityACLsManager(conf);
       this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
           this.labelManager, this.appPriorityACLManager);
       this.queueManager.setCapacitySchedulerContext(this);
-
       this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
-
       this.activitiesManager = new ActivitiesManager(rmContext);
       activitiesManager.init(conf);
       initializeQueues(this.conf);
       this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
-
-      scheduleAsynchronously = this.conf.getScheduleAynschronously();
-      asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
-          DEFAULT_ASYNC_SCHEDULER_INTERVAL);
-
       this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
       this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();
-
-      this.appShouldFailFast = 
CapacitySchedulerConfiguration.shouldAppFailFast(
-          getConfig());
-
-      // number of threads for async scheduling
-      int maxAsyncSchedulingThreads = this.conf.getInt(
-          
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
-          1);
-      maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
-
-      if (scheduleAsynchronously) {
-        asyncSchedulerThreads = new ArrayList<>();
-        for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
-          asyncSchedulerThreads.add(new AsyncScheduleThread(this));
-        }
-        resourceCommitterService = new ResourceCommitterService(this);
-        asyncMaxPendingBacklogs = this.conf.getInt(
-            CapacitySchedulerConfiguration.
-                SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
-            CapacitySchedulerConfiguration.
-                DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
-      }
+      this.appShouldFailFast = 
CapacitySchedulerConfiguration.shouldAppFailFast(getConfig());
+      initAsyncSchedulingProperties();
 
       // Setup how many containers we can allocate for each round
       offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
 
-      // Register CS specific multi-node policies to common MultiNodeManager
-      // which will add to a MultiNodeSorter which gives a pre-sorted list of
-      // nodes to scheduler's allocation.
-      multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
-      if(rmContext.getMultiNodeSortingManager() != null) {
-        rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
-            multiNodePlacementEnabled,
-            this.conf.getMultiNodePlacementPolicies());
-      }
-
-      LOG.info("Initialized CapacityScheduler with " + "calculator="
-          + getResourceCalculator().getClass() + ", " + "minimumAllocation="
-          + getMinimumResourceCapability() + ", " + "maximumAllocation="
-          + getMaximumResourceCapability() + ", " + "asynchronousScheduling="
-          + scheduleAsynchronously + ", " + "asyncScheduleInterval="
-          + asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled="
-          + multiNodePlacementEnabled + ", " + "assignMultipleEnabled="
-          + assignMultipleEnabled + ", " + "maxAssignPerHeartbeat="
-          + maxAssignPerHeartbeat + ", " + "offswitchPerHeartbeatLimit="
-          + offswitchPerHeartbeatLimit);
+      initMultiNodePlacement();
+      printSchedulerInitialized();
     } finally {
       writeLock.unlock();
     }
   }
 
+  private CSConfigurationProvider getCsConfProvider(Configuration 
configuration) throws IOException {
+    String confProviderStr = configuration.get(
+        YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+        YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
+    switch (confProviderStr) {
+      case YarnConfiguration.FILE_CONFIGURATION_STORE:
+        this.csConfProvider =
+            new FileBasedCSConfigurationProvider(rmContext);
+        break;
+      case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
+      case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
+      case YarnConfiguration.ZK_CONFIGURATION_STORE:
+      case YarnConfiguration.FS_CONFIGURATION_STORE:
+        return new MutableCSConfigurationProvider(rmContext);
+      default:
+        throw new IOException("Invalid configuration store class: " + 
confProviderStr);
+    }
+    return null;
+  }
+
+  private ResourceCalculator initResourceCalculator() {
+    ResourceCalculator resourceCalculator = this.conf.getResourceCalculator();
+    if (this.calculator instanceof DefaultResourceCalculator

Review comment:
       ```
   if (resourceCalculator instanceof...)
   ```
   
   I think this breaks the unit tests:
   
   ```
   [ERROR] 
testDefaultResourceCalculatorWithThirdResourceTypes(org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerWithMultiResourceTypes)
  Time elapsed: 0.139 s  <<< FAILURE!
   java.lang.AssertionError: Should have exception in CS
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to