This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 2ab6200 [GOBBLIN-1177] Provide a config for overprovisioning Gobblin Yarn containers by a configurable amount[] 2ab6200 is described below commit 2ab6200116062f49f9d3c7854270ea19490d8ba1 Author: sv2000 <sudarsh...@gmail.com> AuthorDate: Tue Jun 2 15:46:24 2020 -0700 [GOBBLIN-1177] Provide a config for overprovisioning Gobblin Yarn containers by a configurable amount[] Closes #3023 from sv2000/containerOverProvision --- .../gobblin/yarn/YarnAutoScalingManager.java | 25 ++++--- .../gobblin/yarn/YarnAutoScalingManagerTest.java | 85 +++++++++++++++++++--- 2 files changed, 92 insertions(+), 18 deletions(-) diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java index d08a4c4..458527a 100644 --- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java +++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java @@ -29,8 +29,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.util.ExecutorsUtils; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey; @@ -50,6 +48,9 @@ import com.typesafe.config.Config; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; + import static org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX; @@ -69,6 +70,9 @@ public class YarnAutoScalingManager extends AbstractIdleService { private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers"; private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1; private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers"; + private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = AUTO_SCALING_PREFIX + "overProvisionFactor"; + private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 1.0; + // A rough value of how much containers should be an intolerable number. private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE; private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay"; @@ -85,6 +89,7 @@ public class YarnAutoScalingManager extends AbstractIdleService { private final int partitionsPerContainer; private final int minContainers; private final int maxContainers; + private final double overProvisionFactor; private final SlidingWindowReservoir slidingFixedSizeWindow; private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES; @@ -107,6 +112,9 @@ public class YarnAutoScalingManager extends AbstractIdleService { this.maxContainers = ConfigUtils.getInt(this.config, AUTO_SCALING_MAX_CONTAINERS, DEFAULT_AUTO_SCALING_MAX_CONTAINERS); + this.overProvisionFactor = ConfigUtils.getDouble(this.config, AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR, + DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR); + Preconditions.checkArgument(this.maxContainers > 0, DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than 0"); @@ -123,7 +131,7 @@ public class YarnAutoScalingManager extends AbstractIdleService { } @Override - protected void startUp() throws Exception { + protected void startUp() { int scheduleInterval = ConfigUtils.getInt(this.config, AUTO_SCALING_POLLING_INTERVAL_SECS, DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS); int initialDelay = ConfigUtils.getInt(this.config, AUTO_SCALING_INITIAL_DELAY, @@ -132,13 +140,13 @@ public class YarnAutoScalingManager extends AbstractIdleService { log.info("Scheduling the auto scaling task with an interval of {} seconds", scheduleInterval); this.autoScalingExecutor.scheduleAtFixedRate(new YarnAutoScalingRunnable(new TaskDriver(this.helixManager), - this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers, + this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers, this.overProvisionFactor, this.slidingFixedSizeWindow, this.helixManager.getHelixDataAccessor()), initialDelay, scheduleInterval, TimeUnit.SECONDS); } @Override - protected void shutDown() throws Exception { + protected void shutDown() { log.info("Stopping the " + YarnAutoScalingManager.class.getSimpleName()); ExecutorsUtils.shutdownExecutorService(this.autoScalingExecutor, Optional.of(log)); @@ -156,6 +164,7 @@ public class YarnAutoScalingManager extends AbstractIdleService { private final int partitionsPerContainer; private final int minContainers; private final int maxContainers; + private final double overProvisionFactor; private final SlidingWindowReservoir slidingWindowReservoir; private final HelixDataAccessor helixDataAccessor; /** @@ -245,11 +254,9 @@ public class YarnAutoScalingManager extends AbstractIdleService { } } - - // compute the target containers as a ceiling of number of partitions divided by the number of containers - // per partition. - int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer); + // per partition. Scale the result by a constant overprovision factor. + int numTargetContainers = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor); // adjust the number of target containers based on the configured min and max container values. numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers)); diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java index ba6c1db..6c40471 100644 --- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java +++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java @@ -82,7 +82,7 @@ public class YarnAutoScalingManagerTest { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, - 1, 10, noopQueue, helixDataAccessor); + 1, 10, 1.0, noopQueue, helixDataAccessor); runnable.run(); @@ -132,7 +132,7 @@ public class YarnAutoScalingManagerTest { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, - 1, 1, 10, noopQueue, helixDataAccessor); + 1, 1, 10, 1.0, noopQueue, helixDataAccessor); runnable.run(); @@ -201,7 +201,7 @@ public class YarnAutoScalingManagerTest { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, - 1, 1, 10, noopQueue, helixDataAccessor); + 1, 1, 10, 1.0, noopQueue, helixDataAccessor); runnable.run(); @@ -270,7 +270,7 @@ public class YarnAutoScalingManagerTest { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, - 1, 1, 10, noopQueue, helixDataAccessor); + 1, 1, 10, 1.0, noopQueue, helixDataAccessor); runnable.run(); @@ -314,7 +314,7 @@ public class YarnAutoScalingManagerTest { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, - 2, 1, 10, noopQueue, helixDataAccessor); + 2, 1, 10, 1.0, noopQueue, helixDataAccessor); runnable.run(); @@ -359,7 +359,7 @@ public class YarnAutoScalingManagerTest { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, - 1, 5, 10, noopQueue, helixDataAccessor); + 1, 5, 10, 1.0, noopQueue, helixDataAccessor); runnable.run(); @@ -368,7 +368,6 @@ public class YarnAutoScalingManagerTest { .requestTargetNumberOfContainers(5, ImmutableSet.of("GobblinYarnTaskRunner-1")); } - /** * Test max containers */ @@ -404,7 +403,7 @@ public class YarnAutoScalingManagerTest { YarnAutoScalingManager.YarnAutoScalingRunnable runnable = new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, - 1, 1, 1, noopQueue, helixDataAccessor); + 1, 1, 1, 1.0, noopQueue, helixDataAccessor); runnable.run(); @@ -413,6 +412,74 @@ public class YarnAutoScalingManagerTest { .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1")); } + @Test + public void testOverprovision() { + YarnService mockYarnService = mock(YarnService.class); + TaskDriver mockTaskDriver = mock(TaskDriver.class); + WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class); + JobDag mockJobDag = mock(JobDag.class); + + Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1")); + Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag); + + Mockito.when(mockTaskDriver.getWorkflows()) + .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig)); + + WorkflowContext mockWorkflowContext = mock(WorkflowContext.class); + Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS); + + Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext); + + JobContext mockJobContext = mock(JobContext.class); + Mockito.when(mockJobContext.getPartitionSet()) + .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2))); + Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1"); + + Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext); + + HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); + Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster")); + Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())) + .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""))); + + YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 = + new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, + 1, 1, 10, 1.2, noopQueue, helixDataAccessor); + + runnable1.run(); + + // 3 containers requested to max and one worker in use + // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2, Min containers = 1, Max = 10 + // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 1.2))) = 3. + Mockito.verify(mockYarnService, times(1)) + .requestTargetNumberOfContainers(3, ImmutableSet.of("GobblinYarnTaskRunner-1")); + + + YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 = + new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, + 1, 1, 10, 0.1, noopQueue, helixDataAccessor); + + runnable2.run(); + + // 3 containers requested to max and one worker in use + // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2, Min containers = 1, Max = 10 + // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 0.1))) = 1. + Mockito.verify(mockYarnService, times(1)) + .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1")); + + YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 = + new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, + 1, 1, 10, 6.0, noopQueue, helixDataAccessor); + + runnable3.run(); + + // 3 containers requested to max and one worker in use + // NumPartitions = 2, Partitions per container = 1 and overprovision = 6.0, + // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 6.0))) = 10. + Mockito.verify(mockYarnService, times(1)) + .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1")); + } + /** * Test suppressed exception */ @@ -542,7 +609,7 @@ public class YarnAutoScalingManagerTest { public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService yarnService, int partitionsPerContainer, int minContainers, int maxContainers, HelixDataAccessor helixDataAccessor) { - super(taskDriver, yarnService, partitionsPerContainer, minContainers, maxContainers, noopQueue, helixDataAccessor); + super(taskDriver, yarnService, partitionsPerContainer, minContainers, maxContainers, 1.0, noopQueue, helixDataAccessor); } @Override