[ https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=946035&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946035 ]
ASF GitHub Bot logged work on GOBBLIN-2174: ------------------------------------------- Author: ASF GitHub Bot Created on: 28/Nov/24 15:37 Start Date: 28/Nov/24 15:37 Worklog Time Spent: 10m Work Description: Blazer-007 commented on code in PR #4077: URL: https://github.com/apache/gobblin/pull/4077#discussion_r1862400342 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; + private final String DYNAMIC_SCALING_POLLING_INTERVAL = DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + private final Config config; + DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + private final FileSystem fs; + + public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); + this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), + com.google.common.base.Optional.of("DynamicScalingExecutor"))); + this.fs = appMaster.getFs(); + } + + @Override + protected void startUp() { + int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL, + DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); + int initialDelay = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_INITIAL_DELAY, + DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS); + + ScalingDirectiveSource fsScalingDirectiveSource = new FsScalingDirectiveSource( + this.fs, + this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR), + Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR)) + ); + + // TODO: remove this line later + // Using for testing purposes only + ScalingDirectiveSource scalingDirectiveSource = new DummyScalingDirectiveSource(); Review Comment: I am using `DummyScalingDirectiveSource();` to launch containers at runtime if i run any job to test complete e2e. Wrt to unit testing, i think that would not be too much helpful given that for other pieces like Workforcestaffing workforceplan & its method you have already added unit tests. >... which reminds me.... how is this DSYSM created and initialized at present? here after starting yarnservice - https://github.com/apache/gobblin/blob/master/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java#L102 we initialize other service classes whose names are passed through config https://github.com/apache/gobblin/blob/e5d897edaee391d05a55e6ac8a420e3416fef6d9/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java#L78 Issue Time Tracking ------------------- Worklog Id: (was: 946035) Time Spent: 1h 40m (was: 1.5h) > Add GoT YarnService integration with DynamicScaling > --------------------------------------------------- > > Key: GOBBLIN-2174 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2174 > Project: Apache Gobblin > Issue Type: Bug > Components: gobblin-core > Reporter: Vivek Rai > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > After dynamic scaling implemented as part of > https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn > Service needs to be integrated with the dynamic scaling to have fully > functional dynamic scalable yarn service. -- This message was sent by Atlassian Jira (v8.20.10#820010)