[ https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=945674&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-945674 ]
ASF GitHub Bot logged work on GOBBLIN-2174: ------------------------------------------- Author: ASF GitHub Bot Created on: 26/Nov/24 09:52 Start Date: 26/Nov/24 09:52 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4077: URL: https://github.com/apache/gobblin/pull/4077#discussion_r1858153223 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.base.Optional; +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.StaffingDeltas; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforcePlan; +import org.apache.gobblin.temporal.dynamic.WorkforceStaffing; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** + * Service for dynamically scaling Gobblin containers running on YARN. + * This service manages workforce staffing and plans, and requests new containers as needed. + */ +@Slf4j +public class DynamicScalingYarnService extends YarnService { + + /** this holds the current count of containers requested for each worker profile */ + private final WorkforceStaffing workforceStaffing; + /** this holds the current total workforce plan as per latest received scaling directives */ + private final WorkforcePlan workforcePlan; + + public DynamicScalingYarnService(Config config, String applicationName, String applicationId, + YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { + super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); + + this.workforceStaffing = WorkforceStaffing.initialize(getInitialContainers()); + this.workforcePlan = new WorkforcePlan(getConfig(), getInitialContainers()); + } + + /** + * Revises the workforce plan and requests new containers based on the given scaling directives. + * + * @param scalingDirectives the list of scaling directives + */ + public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> scalingDirectives) { + this.workforcePlan.reviseWhenNewer(scalingDirectives); + StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.workforceStaffing); + requestNewContainersForStaffingDeltas(deltas); + } + + private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) { + deltas.getPerProfileDeltas().forEach(profileDelta -> { + if (profileDelta.getDelta() > 0) { + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int curNumContainers = this.workforceStaffing.getStaffing(profileName).orElse(0); + int delta = profileDelta.getDelta(); + log.info("Requesting {} new containers for profile {} having currently {} containers", delta, + profileName, curNumContainers); + requestContainersForWorkerProfile(workerProfile, delta); + // update our staffing after requesting new containers + this.workforceStaffing.reviseStaffing(profileName, curNumContainers + delta, System.currentTimeMillis()); + } + // TODO: Decide how to handle negative deltas Review Comment: will you add this before merge? if not, let's at least log when it's happening! Issue Time Tracking ------------------- Worklog Id: (was: 945674) Time Spent: 50m (was: 40m) > Add GoT YarnService integration with DynamicScaling > --------------------------------------------------- > > Key: GOBBLIN-2174 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2174 > Project: Apache Gobblin > Issue Type: Bug > Components: gobblin-core > Reporter: Vivek Rai > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 50m > Remaining Estimate: 0h > > After dynamic scaling implemented as part of > https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn > Service needs to be integrated with the dynamic scaling to have fully > functional dynamic scalable yarn service. -- This message was sent by Atlassian Jira (v8.20.10#820010)