This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ec0f4ec  [GOBBLIN-780] Handle scenarios that cause the 
YarnAutoScalingManager to be stuck
ec0f4ec is described below

commit ec0f4ec2f3838b1ffa60f3bfad791e5a0b334626
Author: Hung Tran <[email protected]>
AuthorDate: Tue May 28 09:55:51 2019 -0700

    [GOBBLIN-780] Handle scenarios that cause the YarnAutoScalingManager to be 
stuck
    
    Closes #2644 from htran1/fix_auto_scale_stuck
---
 .../gobblin/yarn/YarnAutoScalingManager.java       | 27 +++++++--
 .../gobblin/yarn/YarnAutoScalingManagerTest.java   | 65 +++++++++++++++++++++-
 2 files changed, 85 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index e9d934b..5be2a4b 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -60,9 +60,11 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
   private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = 
AUTO_SCALING_PREFIX + "partitionsPerContainer";
   private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
   private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + 
"minContainers";
-  private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 0;
+  private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
   private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + 
"maxContainers";
   private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
+  private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + 
"initialDelay";
+  private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
 
   private final Config config;
   private final HelixManager helixManager;
@@ -85,8 +87,8 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     this.minContainers = ConfigUtils.getInt(this.config, 
AUTO_SCALING_MIN_CONTAINERS,
         DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
 
-    Preconditions.checkArgument(this.minContainers >= 0,
-        DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than or 
equal to 0");
+    Preconditions.checkArgument(this.minContainers > 0,
+        DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than 0");
 
     this.maxContainers = ConfigUtils.getInt(this.config, 
AUTO_SCALING_MAX_CONTAINERS,
         DEFAULT_AUTO_SCALING_MAX_CONTAINERS);
@@ -106,11 +108,13 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
   protected void startUp() throws Exception {
     int scheduleInterval = ConfigUtils.getInt(this.config, 
AUTO_SCALING_POLLING_INTERVAL_SECS,
         DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS);
+    int initialDelay = ConfigUtils.getInt(this.config, 
AUTO_SCALING_INITIAL_DELAY,
+        DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS);
     log.info("Starting the " + YarnAutoScalingManager.class.getSimpleName());
     log.info("Scheduling the auto scaling task with an interval of {} 
seconds", scheduleInterval);
 
     this.autoScalingExecutor.scheduleAtFixedRate(new 
YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
-            this.yarnService, this.partitionsPerContainer, this.minContainers, 
this.maxContainers), 0,
+            this.yarnService, this.partitionsPerContainer, this.minContainers, 
this.maxContainers), initialDelay,
         scheduleInterval, TimeUnit.SECONDS);
   }
 
@@ -134,12 +138,23 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
     private final int minContainers;
     private final int maxContainers;
 
+
+    @Override
+    public void run() {
+      // Suppress errors to avoid interrupting any scheduled executions of 
this Runnable
+      try {
+        runInternal();
+      } catch (Throwable t) {
+        log.warn("Suppressing error from YarnAutoScalingRunnable.run()", t);
+      }
+    }
+
     /**
      * Iterate through the workflows configured in Helix to figure out the 
number of required partitions
      * and request the {@link YarnService} to scale to the desired number of 
containers.
      */
-    @Override
-    public void run() {
+    @VisibleForTesting
+    void runInternal() {
       Set<String> inUseInstances = new HashSet<>();
 
       int numPartitions = 0;
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 2a36646..58d639e 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -351,7 +351,70 @@ public class YarnAutoScalingManagerTest {
 
     runnable.run();
 
-    // 1 containers requested  to max and one worker in use
+    // 1 container requested to max and one worker in use
     Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
   }
+
+  /**
+   * Test suppressed exception
+   */
+  @Test
+  public void testSuppressedException() throws IOException {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+    JobDag mockJobDag = mock(JobDag.class);
+
+    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+    JobContext mockJobContext = mock(JobContext.class);
+    Mockito.when(mockJobContext.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+
+    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+    TestYarnAutoScalingRunnable runnable =
+        new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 
1);
+
+    runnable.setRaiseException(true);
+    runnable.run();
+    Mockito.verify(mockYarnService, 
times(0)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+
+    runnable.setRaiseException(false);
+    runnable.run();
+    // 1 container requested to max and one worker in use
+    Mockito.verify(mockYarnService, 
times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+  }
+
+  private static class TestYarnAutoScalingRunnable extends 
YarnAutoScalingManager.YarnAutoScalingRunnable {
+    boolean raiseException = false;
+
+    public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService 
yarnService, int partitionsPerContainer,
+        int minContainers, int maxContainers) {
+      super(taskDriver, yarnService, partitionsPerContainer, minContainers, 
maxContainers);
+    }
+
+    @Override
+    void runInternal() {
+      if (this.raiseException) {
+        throw new RuntimeException("Test exception");
+      } else {
+        super.runInternal();
+      }
+    }
+
+    void setRaiseException(boolean raiseException) {
+      this.raiseException = raiseException;
+    }
+  }
 }

Reply via email to