[ https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=945673&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-945673 ]
ASF GitHub Bot logged work on GOBBLIN-2174: ------------------------------------------- Author: ASF GitHub Bot Created on: 26/Nov/24 09:51 Start Date: 26/Nov/24 09:51 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4077: URL: https://github.com/apache/gobblin/pull/4077#discussion_r1858144495 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; + private final String DYNAMIC_SCALING_POLLING_INTERVAL = DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + private final Config config; + DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + private final FileSystem fs; + + public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); + this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), + com.google.common.base.Optional.of("DynamicScalingExecutor"))); + this.fs = appMaster.getFs(); + } + + @Override + protected void startUp() { + int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL, + DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); + int initialDelay = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_INITIAL_DELAY, + DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS); + + ScalingDirectiveSource fsScalingDirectiveSource = new FsScalingDirectiveSource( + this.fs, + this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR), + Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR)) + ); + + // TODO: remove this line later + // Using for testing purposes only + ScalingDirectiveSource scalingDirectiveSource = new DummyScalingDirectiveSource(); + + log.info("Starting the " + this.getClass().getSimpleName()); + log.info("Scheduling the dynamic scaling task with an interval of {} seconds", scheduleInterval); + + this.dynamicScalingExecutor.scheduleAtFixedRate( + new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, scalingDirectiveSource), + initialDelay, scheduleInterval, TimeUnit.SECONDS + ); + } + + @Override + protected void shutDown() { + log.info("Stopping the " + this.getClass().getSimpleName()); + ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, com.google.common.base.Optional.of(log)); + } + + /** + * A {@link Runnable} that gets the scaling directives from the {@link ScalingDirectiveSource} and passes them to the + * {@link DynamicScalingYarnService} for processing. + */ + @AllArgsConstructor + static class GetScalingDirectivesRunnable implements Runnable { + private final DynamicScalingYarnService dynamicScalingYarnService; + private final ScalingDirectiveSource scalingDirectiveSource; + + @Override + public void run() { + try { + List<ScalingDirective> scalingDirectives = scalingDirectiveSource.getScalingDirectives(); + if (!scalingDirectives.isEmpty()) { + dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); + } + } catch (IOException e) { + log.error("Failed to get scaling directives", e); + } Review Comment: I suggest a `catch (Throwable t)`, since this runs in an executor and nobody ever checks whether the `Runnable` threw, which means the exception might silently transpire w/ no notice ever logged Issue Time Tracking ------------------- Worklog Id: (was: 945673) Time Spent: 40m (was: 0.5h) > Add GoT YarnService integration with DynamicScaling > --------------------------------------------------- > > Key: GOBBLIN-2174 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2174 > Project: Apache Gobblin > Issue Type: Bug > Components: gobblin-core > Reporter: Vivek Rai > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > After dynamic scaling implemented as part of > https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn > Service needs to be integrated with the dynamic scaling to have fully > functional dynamic scalable yarn service. -- This message was sent by Atlassian Jira (v8.20.10#820010)