This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ec0f4ec [GOBBLIN-780] Handle scenarios that cause the
YarnAutoScalingManager to be stuck
ec0f4ec is described below
commit ec0f4ec2f3838b1ffa60f3bfad791e5a0b334626
Author: Hung Tran <[email protected]>
AuthorDate: Tue May 28 09:55:51 2019 -0700
[GOBBLIN-780] Handle scenarios that cause the YarnAutoScalingManager to be
stuck
Closes #2644 from htran1/fix_auto_scale_stuck
---
.../gobblin/yarn/YarnAutoScalingManager.java | 27 +++++++--
.../gobblin/yarn/YarnAutoScalingManagerTest.java | 65 +++++++++++++++++++++-
2 files changed, 85 insertions(+), 7 deletions(-)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index e9d934b..5be2a4b 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -60,9 +60,11 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER =
AUTO_SCALING_PREFIX + "partitionsPerContainer";
private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX +
"minContainers";
- private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 0;
+ private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX +
"maxContainers";
private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
+ private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX +
"initialDelay";
+ private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
private final Config config;
private final HelixManager helixManager;
@@ -85,8 +87,8 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
this.minContainers = ConfigUtils.getInt(this.config,
AUTO_SCALING_MIN_CONTAINERS,
DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
- Preconditions.checkArgument(this.minContainers >= 0,
- DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than or
equal to 0");
+ Preconditions.checkArgument(this.minContainers > 0,
+ DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than 0");
this.maxContainers = ConfigUtils.getInt(this.config,
AUTO_SCALING_MAX_CONTAINERS,
DEFAULT_AUTO_SCALING_MAX_CONTAINERS);
@@ -106,11 +108,13 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
protected void startUp() throws Exception {
int scheduleInterval = ConfigUtils.getInt(this.config,
AUTO_SCALING_POLLING_INTERVAL_SECS,
DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS);
+ int initialDelay = ConfigUtils.getInt(this.config,
AUTO_SCALING_INITIAL_DELAY,
+ DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS);
log.info("Starting the " + YarnAutoScalingManager.class.getSimpleName());
log.info("Scheduling the auto scaling task with an interval of {}
seconds", scheduleInterval);
this.autoScalingExecutor.scheduleAtFixedRate(new
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
- this.yarnService, this.partitionsPerContainer, this.minContainers,
this.maxContainers), 0,
+ this.yarnService, this.partitionsPerContainer, this.minContainers,
this.maxContainers), initialDelay,
scheduleInterval, TimeUnit.SECONDS);
}
@@ -134,12 +138,23 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
private final int minContainers;
private final int maxContainers;
+
+ @Override
+ public void run() {
+ // Suppress errors to avoid interrupting any scheduled executions of
this Runnable
+ try {
+ runInternal();
+ } catch (Throwable t) {
+ log.warn("Suppressing error from YarnAutoScalingRunnable.run()", t);
+ }
+ }
+
/**
* Iterate through the workflows configured in Helix to figure out the
number of required partitions
* and request the {@link YarnService} to scale to the desired number of
containers.
*/
- @Override
- public void run() {
+ @VisibleForTesting
+ void runInternal() {
Set<String> inUseInstances = new HashSet<>();
int numPartitions = 0;
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 2a36646..58d639e 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -351,7 +351,70 @@ public class YarnAutoScalingManagerTest {
runnable.run();
- // 1 containers requested to max and one worker in use
+ // 1 container requested to max and one worker in use
Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
}
+
+ /**
+ * Test suppressed exception
+ */
+ @Test
+ public void testSuppressedException() throws IOException {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+ JobDag mockJobDag = mock(JobDag.class);
+
+ Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+ Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+ WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+ JobContext mockJobContext = mock(JobContext.class);
+ Mockito.when(mockJobContext.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+
+
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+ TestYarnAutoScalingRunnable runnable =
+ new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1,
1);
+
+ runnable.setRaiseException(true);
+ runnable.run();
+ Mockito.verify(mockYarnService,
times(0)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+
+ runnable.setRaiseException(false);
+ runnable.run();
+ // 1 container requested to max and one worker in use
+ Mockito.verify(mockYarnService,
times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+ }
+
+ private static class TestYarnAutoScalingRunnable extends
YarnAutoScalingManager.YarnAutoScalingRunnable {
+ boolean raiseException = false;
+
+ public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService
yarnService, int partitionsPerContainer,
+ int minContainers, int maxContainers) {
+ super(taskDriver, yarnService, partitionsPerContainer, minContainers,
maxContainers);
+ }
+
+ @Override
+ void runInternal() {
+ if (this.raiseException) {
+ throw new RuntimeException("Test exception");
+ } else {
+ super.runInternal();
+ }
+ }
+
+ void setRaiseException(boolean raiseException) {
+ this.raiseException = raiseException;
+ }
+ }
}