[ 
https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=946065&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946065
 ]

ASF GitHub Bot logged work on GOBBLIN-2174:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/Nov/24 06:03
            Start Date: 29/Nov/24 06:03
    Worklog Time Spent: 10m 
      Work Description: Blazer-007 commented on code in PR #4077:
URL: https://github.com/apache/gobblin/pull/4077#discussion_r1862994679


##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.yarn;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.fs.FileSystem;
+
+import com.typesafe.config.Config;
+import com.google.common.util.concurrent.AbstractIdleService;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource;
+
+
+/**
+ * This class manages the dynamic scaling of the {@link YarnService} by 
periodically polling for scaling directives and passing
+ * the latest scaling directives to the {@link DynamicScalingYarnService} for 
processing.
+ */
+@Slf4j
+public class DynamicScalingYarnServiceManager extends AbstractIdleService {
+
+  private final String DYNAMIC_SCALING_PREFIX = 
GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling.";
+  private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX 
+ "directives.dir";
+  private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + 
"errors.dir";
+  private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX 
+ "initial.delay";
+  private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60;
+  private final String DYNAMIC_SCALING_POLLING_INTERVAL = 
DYNAMIC_SCALING_PREFIX + "polling.interval";
+  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
+  private final Config config;
+  DynamicScalingYarnService dynamicScalingYarnService;
+  private final ScheduledExecutorService dynamicScalingExecutor;
+  private final FileSystem fs;
+
+  public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
+    this.config = appMaster.getConfig();
+    this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
+    this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
+        
ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log),
+            com.google.common.base.Optional.of("DynamicScalingExecutor")));
+    this.fs = appMaster.getFs();
+  }
+
+  @Override
+  protected void startUp() {
+    int scheduleInterval = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_POLLING_INTERVAL,
+        DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+    int initialDelay = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_INITIAL_DELAY,
+        DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS);
+
+    ScalingDirectiveSource fsScalingDirectiveSource = new 
FsScalingDirectiveSource(
+        this.fs,
+        this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
+        Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
+    );
+
+    // TODO: remove this line later
+    //  Using for testing purposes only
+    ScalingDirectiveSource scalingDirectiveSource = new 
DummyScalingDirectiveSource();

Review Comment:
   But I agree with point of creating abstract class, let me add that in next 
commit.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 946065)
    Time Spent: 2h 10m  (was: 2h)

> Add GoT YarnService integration with DynamicScaling
> ---------------------------------------------------
>
>                 Key: GOBBLIN-2174
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2174
>             Project: Apache Gobblin
>          Issue Type: Bug
>          Components: gobblin-core
>            Reporter: Vivek Rai
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> After dynamic scaling implemented as part of 
> https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn 
> Service needs to be integrated with the dynamic scaling to have fully 
> functional dynamic scalable yarn service.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to