[ https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=946768&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-946768 ]
ASF GitHub Bot logged work on GOBBLIN-2174: ------------------------------------------- Author: ASF GitHub Bot Created on: 05/Dec/24 00:18 Start Date: 05/Dec/24 00:18 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4077: URL: https://github.com/apache/gobblin/pull/4077#discussion_r1870177782 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.commons.collections.CollectionUtils; + +import com.typesafe.config.Config; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + * + * This is an abstract class that provides the basic functionality for managing dynamic scaling. Subclasses should implement + * {@link #createScalingDirectiveSource()} to provide a {@link ScalingDirectiveSource} that will be used to get scaling directives. + */ +@Slf4j +public abstract class AbstractDynamicScalingYarnServiceManager extends AbstractIdleService { + + protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + protected final Config config; + private final DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + + public AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) { + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); + } else { + String errorMsg = "Failure while getting YarnService Instance from GobblinTemporalApplicationMaster::get_yarnService()" + + " YarnService is not an instance of DynamicScalingYarnService"; + log.error(errorMsg); Review Comment: be sure to say what the *actual* instance is ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.commons.collections.CollectionUtils; + +import com.typesafe.config.Config; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + * + * This is an abstract class that provides the basic functionality for managing dynamic scaling. Subclasses should implement + * {@link #createScalingDirectiveSource()} to provide a {@link ScalingDirectiveSource} that will be used to get scaling directives. + */ +@Slf4j +public abstract class AbstractDynamicScalingYarnServiceManager extends AbstractIdleService { + + protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + protected final Config config; + private final DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + + public AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) { + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); + } else { + String errorMsg = "Failure while getting YarnService Instance from GobblinTemporalApplicationMaster::get_yarnService()" + + " YarnService is not an instance of DynamicScalingYarnService"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(Optional.of(log), + Optional.of("DynamicScalingExecutor"))); + } + + @Override + protected void startUp() { + int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL, + DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); + log.info("Starting the " + this.getClass().getSimpleName()); + log.info("Scheduling the dynamic scaling task with an interval of {} seconds", scheduleInterval); Review Comment: nit: combine these into one log msg: ``` "Starting the {} with re-scaling interval of {} seconds", .... ``` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.base.Optional; +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.StaffingDeltas; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforcePlan; +import org.apache.gobblin.temporal.dynamic.WorkforceStaffing; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** + * Service for dynamically scaling Gobblin containers running on YARN. + * This service manages workforce staffing and plans, and requests new containers as needed. + */ +@Slf4j +public class DynamicScalingYarnService extends YarnService { + + /** this holds the current count of containers requested for each worker profile */ + private final WorkforceStaffing workforceStaffing; Review Comment: for clarity, suggest to name along the lines of `actualWorkforceStaffing` (whereas the one internal to `WorkforcePlan` is planned/intended) ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -803,24 +810,36 @@ public void onContainersAllocated(List<Container> containers) { // Find matching requests and remove the request (YARN-660). We the scheduler are responsible // for cleaning up requests after allocation based on the design in the described ticket. // YARN does not have a delta request API and the requests are not cleaned up automatically. - // Try finding a match first with the host as the resource name then fall back to any resource match. + // Try finding a match first with requestAllocationId (which should always be the case) then fall back to + // finding a match with the host as the resource name which then will fall back to any resource match. // Also see YARN-1902. Container count will explode without this logic for removing container requests. - List<? extends Collection<AMRMClient.ContainerRequest>> matchingRequests = amrmClientAsync - .getMatchingRequests(container.getPriority(), container.getNodeHttpAddress(), container.getResource()); + Collection<AMRMClient.ContainerRequest> matchingRequestsByAllocationRequestId = amrmClientAsync.getMatchingRequests(container.getAllocationRequestId()); + if (!matchingRequestsByAllocationRequestId.isEmpty()) { + AMRMClient.ContainerRequest firstMatchingContainerRequest = matchingRequestsByAllocationRequestId.iterator().next(); + LOGGER.debug("Found matching requests {}, removing first matching request {}", Review Comment: esp. until we are comfortably using in prod, these may better be `.info` log level. there may be a lot of repetitive logging, when we scale many machines, but that's still O(100). often the number will be much smaller. either way, such diagnostic info may prove invaluable :) ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java: ########## @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.dynamic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + + +/** + * A dummy implementation of {@link ScalingDirectiveSource} that returns a fixed set of {@link ScalingDirective}s. + */ +public class DummyScalingDirectiveSource implements ScalingDirectiveSource { + private final AtomicInteger numInvocations = new AtomicInteger(0); + private final Optional<ProfileDerivation> derivedFromBaseline; + public DummyScalingDirectiveSource() { + this.derivedFromBaseline = Optional.of(new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding( + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, "2048"), + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2") + ) + )); + } + + /** + * @return - A fixed set of {@link ScalingDirective}s corresponding to the invocation number. + */ + @Override + public List<ScalingDirective> getScalingDirectives() { + // Note - profile should exist already or is derived from other profile + if (this.numInvocations.get() == 0) { + this.numInvocations.getAndIncrement(); Review Comment: nit: `incrementAndGet()` once at the top and locally save aside the value to use in these `if/else`s ``` long currentTime = System.currentTimeMillis(); ``` could also be DRY'd up ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + +import static org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL; + +/** Tests for {@link AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/ +public class DynamicScalingYarnServiceManagerTest { + + @Mock private DynamicScalingYarnService mockDynamicScalingYarnService; + @Mock private ScalingDirectiveSource mockScalingDirectiveSource; + @Mock private GobblinTemporalApplicationMaster mockGobblinTemporalApplicationMaster; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + // Using 1 second as polling interval so that the test runs faster and + // GetScalingDirectivesRunnable.run() will be called equal to amount of sleep introduced between startUp + // and shutDown in seconds + Config config = ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, ConfigValueFactory.fromAnyRef(1)); + Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config); + Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService); + } + + @Test + public void testWhenScalingDirectivesIsNull() throws IOException, InterruptedException { + Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null); + TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( + mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource); + testDynamicScalingYarnServiceManager.startUp(); + Thread.sleep(2000); + testDynamicScalingYarnServiceManager.shutDown(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + } + + @Test + public void testWhenScalingDirectivesIsEmpty() throws IOException, InterruptedException { + Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(new ArrayList<>()); + TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( + mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource); + testDynamicScalingYarnServiceManager.startUp(); + Thread.sleep(2000); + testDynamicScalingYarnServiceManager.shutDown(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + } + + /** Note : this test uses {@link org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource}*/ + @Test + public void testWithDummyScalingDirectiveSource() throws InterruptedException { + // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list + // so the total number of invocations after three invocations should always be 3 + TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( + mockGobblinTemporalApplicationMaster, new DummyScalingDirectiveSource()); + testDynamicScalingYarnServiceManager.startUp(); + Thread.sleep(5000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 5 times + testDynamicScalingYarnServiceManager.shutDown(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); Review Comment: I'm tempted to verify further what it actually did. e.g. that it called `requestContainers`. what do you think? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyDynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.dynamic; + +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.yarn.GobblinTemporalApplicationMaster; +import org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager; + +/** + * {@link DummyScalingDirectiveSource} based implementation of {@link AbstractDynamicScalingYarnServiceManager}. + * This class is meant to be used for testing purposes only. + */ Review Comment: how is this to be used? are you hard coding when making a private build or passing by config? if not the latter yet, would it be worth getting it to work that way? ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.net.URL; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import com.google.common.eventbus.EventBus; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** Tests for {@link YarnService}*/ +public class YarnServiceTest { + private Config defaultConfigs; + private final YarnConfiguration yarnConfiguration = new YarnConfiguration(); + private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class); + private final EventBus eventBus = new EventBus("TemporalYarnServiceTest"); + + @BeforeClass + public void setup() { + URL url = YarnServiceTest.class.getClassLoader() + .getResource(YarnServiceTest.class.getSimpleName() + ".conf"); + Assert.assertNotNull(url, "Could not find resource " + url); + this.defaultConfigs = ConfigFactory.parseURL(url).resolve(); + } + + @Test + public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws Exception { + final int containerMemoryMbs = 1500; + final int containerCores = 5; + final int numContainers = 4; + Config config = this.defaultConfigs + .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, ConfigValueFactory.fromAnyRef(containerMemoryMbs)) + .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, ConfigValueFactory.fromAnyRef(containerCores)) + .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, ConfigValueFactory.fromAnyRef(numContainers)); + + YarnService yarnService = new YarnService( + config, + "testApplicationName", + "testApplicationId", + yarnConfiguration, + mockFileSystem, + eventBus + ); + + Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY), containerMemoryMbs); + Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY), containerCores); + Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY), numContainers); + } + + @Test + public void testBuildContainerCommand() throws Exception { + final double jvmMemoryXmxRatio = 0.7; + final int jvmMemoryOverheadMbs = 50; + final int resourceMemoryMB = 3072; + final int expectedJvmMemory = (int) (resourceMemoryMB * jvmMemoryXmxRatio) - jvmMemoryOverheadMbs; + + Config config = this.defaultConfigs.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio)) + .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs)); + + Resource resource = Resource.newInstance(resourceMemoryMB, 2); + + Container mockContainer = Mockito.mock(Container.class); + Mockito.when(mockContainer.getResource()).thenReturn(resource); + Mockito.when(mockContainer.getAllocationRequestId()).thenReturn(0L); + + YarnService yarnService = new YarnService( + config, + "testApplicationName", + "testApplicationId", + yarnConfiguration, + mockFileSystem, + eventBus + ); + + String command = yarnService.buildContainerCommand(mockContainer, "testHelixParticipantId", "testHelixInstanceTag"); + Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M")); Review Comment: first off, thank you for kicking off testing for this module! this seems to be a partial clone of the `YarnServiceTest` in `gobblin-yarn`. I suggest noting that origin for maintainers in the javadoc. is any [other validation from the original](https://github.com/apache/gobblin/blob/ab62e72839bb4824b98a442de4dd1647284e880e/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java#L149) worth including here too? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java: ########## Review Comment: I really like what you've done here w/ separating concerns via the abstract base class! :D ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -419,41 +403,20 @@ private EventSubmitter buildEventSubmitter() { .build(); } - /** - * Request an allocation of containers. If numTargetContainers is larger than the max of current and expected number - * of containers then additional containers are requested. - * <p> - * If numTargetContainers is less than the current number of allocated containers then release free containers. - * Shrinking is relative to the number of currently allocated containers since it takes time for containers - * to be allocated and assigned work and we want to avoid releasing a container prematurely before it is assigned - * work. This means that a container may not be released even though numTargetContainers is less than the requested - * number of containers. The intended usage is for the caller of this method to make periodic calls to attempt to - * adjust the cluster towards the desired number of containers. - * - * @param inUseInstances a set of in use instances - * @return whether successfully requested the target number of containers - */ - public synchronized boolean requestTargetNumberOfContainers(int numContainers, Set<String> inUseInstances) { - int defaultContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); - int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys. CONTAINER_CORES_KEY); - - LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}", - numContainers, inUseInstances.size(), this.containerMap.size()); - - requestContainers(numContainers, Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores)); - LOGGER.info("Current tag-container desired count:{}, tag-container allocated: {}", numContainers, this.allocatedContainerCountMap); - return true; - } - - // Request initial containers with default resource and helix tag - private void requestInitialContainers(int containersRequested) { - requestTargetNumberOfContainers(containersRequested, Collections.EMPTY_SET); + /** Request Initial containers using baselineWorkerProfile */ Review Comment: let's generalize this impl for reuse, by removing the "baseline" / "initial" parts. let's drop the presumption of a well-known default allocation ID and refactor by essentially moving the impl from the `DynamicScalingYarnService` to this base class: ``` private synchronized void requestContainersForWorkerProfile(WorkerProfile workerProfile, int numContainers) { int containerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); int containerCores = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY); long allocationRequestId = storeByUniqueAllocationRequestId(workerProfile); requestContainers(numContainers, Resource.newInstance(containerMemoryMbs, containerCores), Optional.of(allocationRequestId)); } ``` afterwards, requesting initial containers is merely reaching into config for `numInitialContainers` and call this common impl ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.commons.collections.CollectionUtils; + +import com.typesafe.config.Config; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + * + * This is an abstract class that provides the basic functionality for managing dynamic scaling. Subclasses should implement + * {@link #createScalingDirectiveSource()} to provide a {@link ScalingDirectiveSource} that will be used to get scaling directives. + */ +@Slf4j +public abstract class AbstractDynamicScalingYarnServiceManager extends AbstractIdleService { + + protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + protected final Config config; + private final DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + + public AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) { + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); + } else { + String errorMsg = "Failure while getting YarnService Instance from GobblinTemporalApplicationMaster::get_yarnService()" + + " YarnService is not an instance of DynamicScalingYarnService"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(Optional.of(log), + Optional.of("DynamicScalingExecutor"))); + } + + @Override + protected void startUp() { + int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL, + DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); + log.info("Starting the " + this.getClass().getSimpleName()); + log.info("Scheduling the dynamic scaling task with an interval of {} seconds", scheduleInterval); + + this.dynamicScalingExecutor.scheduleAtFixedRate( + new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, createScalingDirectiveSource()), + scheduleInterval, scheduleInterval, TimeUnit.SECONDS + ); + } + + @Override + protected void shutDown() { + log.info("Stopping the " + this.getClass().getSimpleName()); + ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, Optional.of(log)); + } + + /** + * Create a {@link ScalingDirectiveSource} to use for getting scaling directives. + */ + protected abstract ScalingDirectiveSource createScalingDirectiveSource(); + + /** + * A {@link Runnable} that gets the scaling directives from the {@link ScalingDirectiveSource} and passes them to the + * {@link DynamicScalingYarnService} for processing. + */ + @AllArgsConstructor + static class GetScalingDirectivesRunnable implements Runnable { + private final DynamicScalingYarnService dynamicScalingYarnService; + private final ScalingDirectiveSource scalingDirectiveSource; + + @Override + public void run() { + try { + List<ScalingDirective> scalingDirectives = scalingDirectiveSource.getScalingDirectives(); + if (CollectionUtils.isNotEmpty(scalingDirectives)) { + dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); + } + } catch (IOException e) { + log.error("Failed to get scaling directives", e); + } catch (Throwable t) { + log.error("Suppressing error from GetScalingDirectivesRunnable.run()", t); Review Comment: no need to name where, as that shows in the stack trace. maybe just "Unexpected error with dynamic scaling via directives" ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.Optional; + +import org.apache.hadoop.fs.FileSystem; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; + +/** + * {@link FsScalingDirectiveSource} based implementation of {@link AbstractDynamicScalingYarnServiceManager}. + */ +public class FsSourceDynamicScalingYarnServiceManager extends AbstractDynamicScalingYarnServiceManager { + private final static String DYNAMIC_SCALING_DIRECTIVES_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final static String DYNAMIC_SCALING_ERRORS_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir"; Review Comment: `public`, as other code may wish to reference/reuse ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java: ########## Review Comment: these two classes might better belong in `src/test/java`... anyway, they're completely unrelated to the "temporal load-gen" of the `loadgen` pkg ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.base.Optional; +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.StaffingDeltas; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforcePlan; +import org.apache.gobblin.temporal.dynamic.WorkforceStaffing; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** + * Service for dynamically scaling Gobblin containers running on YARN. + * This service manages workforce staffing and plans, and requests new containers as needed. + */ +@Slf4j +public class DynamicScalingYarnService extends YarnService { + + /** this holds the current count of containers requested for each worker profile */ + private final WorkforceStaffing workforceStaffing; + /** this holds the current total workforce plan as per latest received scaling directives */ + private final WorkforcePlan workforcePlan; + + public DynamicScalingYarnService(Config config, String applicationName, String applicationId, + YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { + super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); + + int initialContainers = this.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY); + + this.workforceStaffing = WorkforceStaffing.initialize(initialContainers); Review Comment: [less preferred!] either worth a comment that this truly pre-counts the initial containers in advance of them actually being allocated (when `startUp()` runs) [preferred] alternatively, just initialize as empty (which is most accurate): ``` this.actualWorkforceStaffing = WorkforceStaffing.initialize(0); ``` then have the base class `YarnService::startup` call a method you override here ``` @Override protected void requestInitialContainers() { StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); requestNewContainersForStaffingDeltas(deltas); } ``` ? ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.net.URL; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import com.google.common.eventbus.EventBus; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** Tests for {@link YarnService}*/ +public class YarnServiceTest { + private Config defaultConfigs; + private final YarnConfiguration yarnConfiguration = new YarnConfiguration(); + private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class); + private final EventBus eventBus = new EventBus("TemporalYarnServiceTest"); + + @BeforeClass + public void setup() { + URL url = YarnServiceTest.class.getClassLoader() + .getResource(YarnServiceTest.class.getSimpleName() + ".conf"); + Assert.assertNotNull(url, "Could not find resource " + url); + this.defaultConfigs = ConfigFactory.parseURL(url).resolve(); + } + + @Test + public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws Exception { + final int containerMemoryMbs = 1500; + final int containerCores = 5; + final int numContainers = 4; + Config config = this.defaultConfigs + .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, ConfigValueFactory.fromAnyRef(containerMemoryMbs)) + .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, ConfigValueFactory.fromAnyRef(containerCores)) + .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, ConfigValueFactory.fromAnyRef(numContainers)); + + YarnService yarnService = new YarnService( + config, + "testApplicationName", + "testApplicationId", + yarnConfiguration, + mockFileSystem, + eventBus + ); + + Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY), containerMemoryMbs); + Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY), containerCores); + Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY), numContainers); + } + + @Test + public void testBuildContainerCommand() throws Exception { + final double jvmMemoryXmxRatio = 0.7; + final int jvmMemoryOverheadMbs = 50; + final int resourceMemoryMB = 3072; + final int expectedJvmMemory = (int) (resourceMemoryMB * jvmMemoryXmxRatio) - jvmMemoryOverheadMbs; + + Config config = this.defaultConfigs.withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio)) + .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs)); Review Comment: nit: would line up better if both `.withValue` started a new line ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -257,27 +250,18 @@ public YarnService(Config config, String applicationName, String applicationId, GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS, GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), TimeUnit.SECONDS).build(); - this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO); - - Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && this.jvmMemoryXmxRatio <= 1, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " must be between 0 and 1 inclusive"); - - this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS); - - Preconditions.checkArgument(this.jvmMemoryOverheadMbs < this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " cannot be more than " - + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " - + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY); - this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL, GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL); this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE, GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE); this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); + + // Initialising this baseline worker profile to use as default worker profile in case allocation request id is not in map + this.baselineWorkerProfile = new WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config); Review Comment: sorry, I don't want to be too pedantic, but the most important initialization for Dynamic Scaling is `WorkforcePlan` as: ``` new WorkforcePlan(this.config, config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY)); ``` which you do in the DynamicScalingYarnService` ctor. when that's present, the baseline profile should follow from that plan, rather than the reverse of initializing the plan w/ a free-floating baseline profile, as you have here. AFAICT, there's no need to maintain a `baselineWorkerProfile` member in this class at all. instead introduce an overridable method to be called within `startUp` and give it this fallback impl: ``` /** unless overridden to actually scale, "initial" containers may be the app's *only* containers! */ protected void requestInitialContainers() { WorkerProfile baselineWorkerProfile = new WorkerProfile(this.config); // NOTE: I suggest adding an overloaded single arg ctor, which internally uses `WorkforceProfiles.BASELINE_NAME int numContainers = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY); LOGGER.info("Requesting {} initial (static) containers with baseline (only) profile, never to be re-scaled", numContainers); requestContainersForWorkerProfile(workerProfile, numContainers); } ``` ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java: ########## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.net.URL; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import com.google.common.eventbus.EventBus; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** Tests for {@link YarnService}*/ +public class YarnServiceTest { + private Config defaultConfigs; + private final YarnConfiguration yarnConfiguration = new YarnConfiguration(); + private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class); + private final EventBus eventBus = new EventBus("TemporalYarnServiceTest"); + + @BeforeClass + public void setup() { + URL url = YarnServiceTest.class.getClassLoader() + .getResource(YarnServiceTest.class.getSimpleName() + ".conf"); + Assert.assertNotNull(url, "Could not find resource " + url); + this.defaultConfigs = ConfigFactory.parseURL(url).resolve(); + } + + @Test + public void testBaselineWorkerProfileCreatedWithPassedConfigs() throws Exception { + final int containerMemoryMbs = 1500; + final int containerCores = 5; + final int numContainers = 4; + Config config = this.defaultConfigs + .withValue(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, ConfigValueFactory.fromAnyRef(containerMemoryMbs)) + .withValue(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, ConfigValueFactory.fromAnyRef(containerCores)) + .withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, ConfigValueFactory.fromAnyRef(numContainers)); + + YarnService yarnService = new YarnService( + config, + "testApplicationName", + "testApplicationId", + yarnConfiguration, + mockFileSystem, + eventBus + ); + + Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY), containerMemoryMbs); + Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY), containerCores); + Assert.assertEquals(yarnService.baselineWorkerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY), numContainers); Review Comment: `baselineWorkerProfile` seems like such an internal impl detail that I'm not sure it belongs in validation. how about instead defining the `requestInitialContainers` method I proposed above and having this test call `YarnService::startUp` before validating the args in a call to a method like: ``` protected void requestContainers(int numContainers, Resource resource, Optional<Long> optAllocationRequestId) ``` ? ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + +import static org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL; + +/** Tests for {@link AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/ +public class DynamicScalingYarnServiceManagerTest { + + @Mock private DynamicScalingYarnService mockDynamicScalingYarnService; + @Mock private ScalingDirectiveSource mockScalingDirectiveSource; + @Mock private GobblinTemporalApplicationMaster mockGobblinTemporalApplicationMaster; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + // Using 1 second as polling interval so that the test runs faster and + // GetScalingDirectivesRunnable.run() will be called equal to amount of sleep introduced between startUp + // and shutDown in seconds + Config config = ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, ConfigValueFactory.fromAnyRef(1)); + Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config); + Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService); + } + + @Test + public void testWhenScalingDirectivesIsNull() throws IOException, InterruptedException { + Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null); + TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( + mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource); + testDynamicScalingYarnServiceManager.startUp(); + Thread.sleep(2000); + testDynamicScalingYarnServiceManager.shutDown(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); Review Comment: `Mockito.never()` ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java: ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + +import static org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL; + +/** Tests for {@link AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable}*/ +public class DynamicScalingYarnServiceManagerTest { + + @Mock private DynamicScalingYarnService mockDynamicScalingYarnService; + @Mock private ScalingDirectiveSource mockScalingDirectiveSource; + @Mock private GobblinTemporalApplicationMaster mockGobblinTemporalApplicationMaster; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + // Using 1 second as polling interval so that the test runs faster and + // GetScalingDirectivesRunnable.run() will be called equal to amount of sleep introduced between startUp + // and shutDown in seconds + Config config = ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, ConfigValueFactory.fromAnyRef(1)); + Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config); + Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService); + } + + @Test + public void testWhenScalingDirectivesIsNull() throws IOException, InterruptedException { + Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null); + TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( + mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource); + testDynamicScalingYarnServiceManager.startUp(); + Thread.sleep(2000); + testDynamicScalingYarnServiceManager.shutDown(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + } + + @Test + public void testWhenScalingDirectivesIsEmpty() throws IOException, InterruptedException { + Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(new ArrayList<>()); + TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( + mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource); + testDynamicScalingYarnServiceManager.startUp(); + Thread.sleep(2000); + testDynamicScalingYarnServiceManager.shutDown(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(0)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + } Review Comment: suggest to combine w/ test above, by chaining a second `.thenReturn` invocation there Issue Time Tracking ------------------- Worklog Id: (was: 946768) Time Spent: 3h (was: 2h 50m) > Add GoT YarnService integration with DynamicScaling > --------------------------------------------------- > > Key: GOBBLIN-2174 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2174 > Project: Apache Gobblin > Issue Type: Bug > Components: gobblin-core > Reporter: Vivek Rai > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 3h > Remaining Estimate: 0h > > After dynamic scaling implemented as part of > https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn > Service needs to be integrated with the dynamic scaling to have fully > functional dynamic scalable yarn service. -- This message was sent by Atlassian Jira (v8.20.10#820010)