[ 
https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=945674&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-945674
 ]

ASF GitHub Bot logged work on GOBBLIN-2174:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Nov/24 09:52
            Start Date: 26/Nov/24 09:52
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #4077:
URL: https://github.com/apache/gobblin/pull/4077#discussion_r1858153223


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import com.google.common.base.Optional;
+import com.google.common.eventbus.EventBus;
+import com.typesafe.config.Config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
+import org.apache.gobblin.temporal.dynamic.WorkerProfile;
+import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
+import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
+/**
+ * Service for dynamically scaling Gobblin containers running on YARN.
+ * This service manages workforce staffing and plans, and requests new 
containers as needed.
+ */
+@Slf4j
+public class DynamicScalingYarnService extends YarnService {
+
+  /** this holds the current count of containers requested for each worker 
profile */
+  private final WorkforceStaffing workforceStaffing;
+  /** this holds the current total workforce plan as per latest received 
scaling directives */
+  private final WorkforcePlan workforcePlan;
+
+  public DynamicScalingYarnService(Config config, String applicationName, 
String applicationId,
+      YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) 
throws Exception {
+    super(config, applicationName, applicationId, yarnConfiguration, fs, 
eventBus);
+
+    this.workforceStaffing = 
WorkforceStaffing.initialize(getInitialContainers());
+    this.workforcePlan = new WorkforcePlan(getConfig(), 
getInitialContainers());
+  }
+
+  /**
+   * Revises the workforce plan and requests new containers based on the given 
scaling directives.
+   *
+   * @param scalingDirectives the list of scaling directives
+   */
+  public synchronized void 
reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> 
scalingDirectives) {
+    this.workforcePlan.reviseWhenNewer(scalingDirectives);
+    StaffingDeltas deltas = 
this.workforcePlan.calcStaffingDeltas(this.workforceStaffing);
+    requestNewContainersForStaffingDeltas(deltas);
+  }
+
+  private synchronized void 
requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
+    deltas.getPerProfileDeltas().forEach(profileDelta -> {
+      if (profileDelta.getDelta() > 0) {
+        WorkerProfile workerProfile = profileDelta.getProfile();
+        String profileName = workerProfile.getName();
+        int curNumContainers = 
this.workforceStaffing.getStaffing(profileName).orElse(0);
+        int delta = profileDelta.getDelta();
+        log.info("Requesting {} new containers for profile {} having currently 
{} containers", delta,
+            profileName, curNumContainers);
+        requestContainersForWorkerProfile(workerProfile, delta);
+        // update our staffing after requesting new containers
+        this.workforceStaffing.reviseStaffing(profileName, curNumContainers + 
delta, System.currentTimeMillis());
+      }
+      // TODO: Decide how to handle negative deltas

Review Comment:
   will you add this before merge?  if not, let's at least log when it's 
happening!





Issue Time Tracking
-------------------

    Worklog Id:     (was: 945674)
    Time Spent: 50m  (was: 40m)

> Add GoT YarnService integration with DynamicScaling
> ---------------------------------------------------
>
>                 Key: GOBBLIN-2174
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2174
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-core
>            Reporter: Vivek Rai
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> After dynamic scaling implemented as part of 
> https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn 
> Service needs to be integrated with the dynamic scaling to have fully 
> functional dynamic scalable yarn service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to