szilard-nemeth commented on a change in pull request #3581:
URL: https://github.com/apache/hadoop/pull/3581#discussion_r735527668
##########
File path:
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
##########
@@ -302,110 +302,125 @@ void initScheduler(Configuration configuration) throws
IOException, YarnException {
writeLock.lock();
try {
- String confProviderStr = configuration.get(
- YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
- YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
- switch (confProviderStr) {
- case YarnConfiguration.FILE_CONFIGURATION_STORE:
- this.csConfProvider =
- new FileBasedCSConfigurationProvider(rmContext);
- break;
- case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
- case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
- case YarnConfiguration.ZK_CONFIGURATION_STORE:
- case YarnConfiguration.FS_CONFIGURATION_STORE:
- this.csConfProvider = new MutableCSConfigurationProvider(rmContext);
- break;
- default:
- throw new IOException("Invalid configuration store class: " +
- confProviderStr);
- }
+ this.csConfProvider = getCsConfProvider(configuration);
this.csConfProvider.init(configuration);
this.conf = this.csConfProvider.loadConfiguration(configuration);
validateConf(this.conf);
this.minimumAllocation = super.getMinimumAllocation();
initMaximumResourceCapability(super.getMaximumAllocation());
- this.calculator = this.conf.getResourceCalculator();
- if (this.calculator instanceof DefaultResourceCalculator
- && ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
- throw new YarnRuntimeException("RM uses DefaultResourceCalculator
which"
- + " used only memory as resource-type but invalid resource-types"
- + " specified " + ResourceUtils.getResourceTypes() + ". Use"
- + " DominantResourceCalculator instead to make effective use of"
- + " these resource-types");
- }
+ this.calculator = initResourceCalculator();
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications = new ConcurrentHashMap<>();
this.labelManager = rmContext.getNodeLabelManager();
this.appPriorityACLManager = new AppPriorityACLsManager(conf);
this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
this.labelManager, this.appPriorityACLManager);
this.queueManager.setCapacitySchedulerContext(this);
-
this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
-
this.activitiesManager = new ActivitiesManager(rmContext);
activitiesManager.init(conf);
initializeQueues(this.conf);
this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
-
- scheduleAsynchronously = this.conf.getScheduleAynschronously();
- asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
- DEFAULT_ASYNC_SCHEDULER_INTERVAL);
-
this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();
-
- this.appShouldFailFast =
CapacitySchedulerConfiguration.shouldAppFailFast(
- getConfig());
-
- // number of threads for async scheduling
- int maxAsyncSchedulingThreads = this.conf.getInt(
-
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
- 1);
- maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
-
- if (scheduleAsynchronously) {
- asyncSchedulerThreads = new ArrayList<>();
- for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
- asyncSchedulerThreads.add(new AsyncScheduleThread(this));
- }
- resourceCommitterService = new ResourceCommitterService(this);
- asyncMaxPendingBacklogs = this.conf.getInt(
- CapacitySchedulerConfiguration.
- SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
- CapacitySchedulerConfiguration.
- DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
- }
+ this.appShouldFailFast =
CapacitySchedulerConfiguration.shouldAppFailFast(getConfig());
+ initAsyncSchedulingProperties();
// Setup how many containers we can allocate for each round
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
- // Register CS specific multi-node policies to common MultiNodeManager
- // which will add to a MultiNodeSorter which gives a pre-sorted list of
- // nodes to scheduler's allocation.
- multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
- if(rmContext.getMultiNodeSortingManager() != null) {
- rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
- multiNodePlacementEnabled,
- this.conf.getMultiNodePlacementPolicies());
- }
-
- LOG.info("Initialized CapacityScheduler with " + "calculator="
- + getResourceCalculator().getClass() + ", " + "minimumAllocation="
- + getMinimumResourceCapability() + ", " + "maximumAllocation="
- + getMaximumResourceCapability() + ", " + "asynchronousScheduling="
- + scheduleAsynchronously + ", " + "asyncScheduleInterval="
- + asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled="
- + multiNodePlacementEnabled + ", " + "assignMultipleEnabled="
- + assignMultipleEnabled + ", " + "maxAssignPerHeartbeat="
- + maxAssignPerHeartbeat + ", " + "offswitchPerHeartbeatLimit="
- + offswitchPerHeartbeatLimit);
+ initMultiNodePlacement();
+ printSchedulerInitialized();
} finally {
writeLock.unlock();
}
}
+ private CSConfigurationProvider getCsConfProvider(Configuration
configuration) throws IOException {
+ String confProviderStr = configuration.get(
+ YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+ YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
+ switch (confProviderStr) {
+ case YarnConfiguration.FILE_CONFIGURATION_STORE:
+ this.csConfProvider =
+ new FileBasedCSConfigurationProvider(rmContext);
+ break;
+ case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
+ case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
+ case YarnConfiguration.ZK_CONFIGURATION_STORE:
+ case YarnConfiguration.FS_CONFIGURATION_STORE:
+ return new MutableCSConfigurationProvider(rmContext);
+ default:
+ throw new IOException("Invalid configuration store class: " +
confProviderStr);
+ }
+ return null;
+ }
+
+ private ResourceCalculator initResourceCalculator() {
+ ResourceCalculator resourceCalculator = this.conf.getResourceCalculator();
+ if (this.calculator instanceof DefaultResourceCalculator
Review comment:
Thanks @tomicooler. Makes sense, I get the mistake now. Just fixed it.
Let's wait for the new build result.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]