This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 702cadf48 [GOBBLIN-1844] Ignore workflows marked for deletion when 
calculating container count (#3709)
702cadf48 is described below

commit 702cadf48f910c79b129032aa673f08ce4397c03
Author: Matthew Ho <[email protected]>
AuthorDate: Mon Jun 26 14:30:55 2023 -0700

    [GOBBLIN-1844] Ignore workflows marked for deletion when calculating 
container count (#3709)
    
    * [GOBBLIN-1844] Ignore workflows marked for deletion when calculating 
container count
    
    * Add comment
---
 .../gobblin/yarn/YarnAutoScalingManager.java       |  11 +-
 .../gobblin/yarn/YarnAutoScalingManagerTest.java   | 501 +++++++--------------
 2 files changed, 165 insertions(+), 347 deletions(-)

diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index e6683cfd3..a2f4d8372 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-
 import org.apache.commons.compress.utils.Sets;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
@@ -39,6 +38,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskPartitionState;
 import org.apache.helix.task.TaskState;
@@ -220,16 +220,19 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
       YarnContainerRequestBundle yarnContainerRequestBundle = new 
YarnContainerRequestBundle();
       for (Map.Entry<String, WorkflowConfig> workFlowEntry : 
taskDriver.getWorkflows().entrySet()) {
         WorkflowContext workflowContext = 
taskDriver.getWorkflowContext(workFlowEntry.getKey());
+        WorkflowConfig workflowConfig = workFlowEntry.getValue();
 
-        // Only allocate for active workflows
-        if (workflowContext == null || 
!workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS)) {
+        // Only allocate for active workflows. Those marked for deletion are 
ignored but the existing containers won't
+        // be released until maxIdleTimeInMinutesBeforeScalingDown
+        if (workflowContext == null ||
+            TargetState.DELETE.equals(workflowConfig.getTargetState()) ||
+            !workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS)) 
{
           continue;
         }
 
         log.debug("Workflow name {} config {} context {}", 
workFlowEntry.getKey(), workFlowEntry.getValue(),
             workflowContext);
 
-        WorkflowConfig workflowConfig = workFlowEntry.getValue();
         JobDag jobDag = workflowConfig.getJobDag();
         Set<String> jobs = jobDag.getAllNodes();
 
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 687af96fd..10563c2bb 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -17,10 +17,11 @@
 
 package org.apache.gobblin.yarn;
 
-import java.io.IOException;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
@@ -29,6 +30,7 @@ import org.apache.helix.PropertyKey;
 import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TargetState;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
@@ -63,34 +65,16 @@ public class YarnAutoScalingManagerTest {
    * Test for one workflow with one job
    */
   @Test
-  public void testOneJob() throws IOException {
+  public void testOneJob() {
     YarnService mockYarnService = mock(YarnService.class);
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
-    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
-    JobDag mockJobDag = mock(JobDag.class);
+    WorkflowConfig mockWorkflowConfig =
+        getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1"), 
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+    
Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of("workflow1",
 mockWorkflowConfig));
 
-    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
-    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+    getJobContext(mockTaskDriver, ImmutableMap.of(2, 
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
 
-    Mockito.when(mockTaskDriver.getWorkflows())
-        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
-    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
-    JobContext mockJobContext = mock(JobContext.class);
-    Mockito.when(mockJobContext.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
+    HelixDataAccessor helixDataAccessor = 
getHelixDataAccessor(Arrays.asList("GobblinClusterManager", 
"GobblinYarnTaskRunner-1"));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
@@ -109,40 +93,18 @@ public class YarnAutoScalingManagerTest {
    * Test for one workflow with two jobs
    */
   @Test
-  public void testTwoJobs() throws IOException {
+  public void testTwoJobs() {
     YarnService mockYarnService = mock(YarnService.class);
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
-    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
-    JobDag mockJobDag = mock(JobDag.class);
-
-    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1", 
"job2"));
-    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
+    WorkflowConfig mockWorkflowConfig =
+        getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1", "job2"), 
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
     Mockito.when(mockTaskDriver.getWorkflows())
         .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
 
-    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+    getJobContext(mockTaskDriver, ImmutableMap.of(2, 
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
+    getJobContext(mockTaskDriver, ImmutableMap.of(3, 
"GobblinYarnTaskRunner-2"), "job2");
 
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
-    JobContext mockJobContext1 = mock(JobContext.class);
-    Mockito.when(mockJobContext1.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
-
-    JobContext mockJobContext2 = mock(JobContext.class);
-    Mockito.when(mockJobContext2.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-    
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
-    
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
-
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty(""),
-            "GobblinYarnTaskRunner-2", new HelixProperty("")));
+    HelixDataAccessor helixDataAccessor = 
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2"));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
@@ -162,59 +124,24 @@ public class YarnAutoScalingManagerTest {
    * Test for two workflows
    */
   @Test
-  public void testTwoWorkflows() throws IOException {
+  public void testTwoWorkflows()  {
     YarnService mockYarnService = mock(YarnService.class);
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
 
-    WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
-    JobDag mockJobDag1 = mock(JobDag.class);
-
-    Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1", 
"job2"));
-    Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
-
-    WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
-
-    JobContext mockJobContext1 = mock(JobContext.class);
-    Mockito.when(mockJobContext1.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
-
-    JobContext mockJobContext2 = mock(JobContext.class);
-    Mockito.when(mockJobContext2.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-    
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
-    
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
-
-    WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
-    JobDag mockJobDag2 = mock(JobDag.class);
-
-    
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
-    Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
-
-    WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
-
-    JobContext mockJobContext3 = mock(JobContext.class);
-    Mockito.when(mockJobContext3.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
-    
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
-    
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+    WorkflowConfig mockWorkflowConfig1 =
+        getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1", "job2"), 
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+    WorkflowConfig mockWorkflowConfig2 =
+        getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job3"), 
TaskState.IN_PROGRESS, TargetState.START, "workflow2");
 
     Mockito.when(mockTaskDriver.getWorkflows())
         .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, 
"workflow2", mockWorkflowConfig2));
 
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty(""),
-            "GobblinYarnTaskRunner-2", new HelixProperty(""),
-            "GobblinYarnTaskRunner-3", new HelixProperty("")));
+    getJobContext(mockTaskDriver, ImmutableMap.of(2, 
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
+    getJobContext(mockTaskDriver, ImmutableMap.of(3, 
"GobblinYarnTaskRunner-2"), "job2");
+    getJobContext(mockTaskDriver, ImmutableMap.of(4, 
"GobblinYarnTaskRunner-3"), "job3", ImmutableSet.of(4,5));
+
+    HelixDataAccessor helixDataAccessor = getHelixDataAccessor(Arrays.asList(
+        "GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2","GobblinYarnTaskRunner-3"));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
@@ -223,70 +150,37 @@ public class YarnAutoScalingManagerTest {
     runnable.run();
 
     // 5 containers requested and 3 workers in use
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
+    assertContainerRequest(mockYarnService, 5,
+        ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", 
"GobblinYarnTaskRunner-3"));
   }
 
   /**
-   * Test for two workflows with one not in progress.
-   * The partitions for the workflow that is not in progress should not be 
counted.
+   * Test for three workflows with one not in progress and one marked for 
delete.
+   * The partitions for the workflow that is not in progress or is marked for 
delete should not be counted.
    */
   @Test
-  public void testNotInProgress() throws IOException {
+  public void testNotInProgressOrBeingDeleted()  {
     YarnService mockYarnService = mock(YarnService.class);
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
 
-    WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
-    JobDag mockJobDag1 = mock(JobDag.class);
-
-    Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1", 
"job2"));
-    Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
-
-    WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
-
-    JobContext mockJobContext1 = mock(JobContext.class);
-    Mockito.when(mockJobContext1.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
-
-    JobContext mockJobContext2 = mock(JobContext.class);
-    Mockito.when(mockJobContext2.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-    
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
-    
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
-
-    WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
-    JobDag mockJobDag2 = mock(JobDag.class);
-
-    
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
-    Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
-
-    WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.COMPLETED);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
-
-    JobContext mockJobContext3 = mock(JobContext.class);
-    Mockito.when(mockJobContext3.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
-    
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
-    
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
-
-    Mockito.when(mockTaskDriver.getWorkflows())
-        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, 
"workflow2", mockWorkflowConfig2));
-
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty(""),
-            "GobblinYarnTaskRunner-2", new HelixProperty("")));
+    WorkflowConfig workflowInProgress = getWorkflowConfig(mockTaskDriver, 
ImmutableSet.of("job-inProgress-1", "job-inProgress-2"), TaskState.IN_PROGRESS, 
TargetState.START, "workflowInProgress");
+    WorkflowConfig workflowCompleted = getWorkflowConfig(mockTaskDriver, 
ImmutableSet.of("job-complete-1"), TaskState.COMPLETED, TargetState.STOP, 
"workflowCompleted");
+    WorkflowConfig workflowSetToBeDeleted = getWorkflowConfig(mockTaskDriver, 
ImmutableSet.of("job-setToDelete-1"), TaskState.IN_PROGRESS, 
TargetState.DELETE, "workflowSetToBeDeleted");
+    Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of(
+        "workflowInProgress", workflowInProgress,
+        "workflowCompleted", workflowCompleted,
+        "workflowSetToBeDeleted", workflowSetToBeDeleted));
+
+    getJobContext(mockTaskDriver, ImmutableMap.of(1, 
"GobblinYarnTaskRunner-1"), "job-inProgress-1",
+        ImmutableSet.of(1,2));
+    getJobContext(mockTaskDriver, ImmutableMap.of(2, 
"GobblinYarnTaskRunner-2"), "job-inProgress-2");
+    getJobContext(mockTaskDriver, ImmutableMap.of(1, 
"GobblinYarnTaskRunner-3"), "job-setToDelete-1");
+    getJobContext(mockTaskDriver, ImmutableMap.of(1, 
"GobblinYarnTaskRunner-4"), "job-complete-1",
+        ImmutableSet.of(1, 5));
+
+    HelixDataAccessor helixDataAccessor = getHelixDataAccessor(
+        Arrays.asList("GobblinClusterManager",
+            "GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", 
"GobblinYarnTaskRunner-3", "GobblinYarnTaskRunner-4"));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
@@ -294,46 +188,26 @@ public class YarnAutoScalingManagerTest {
 
     runnable.run();
 
-    // 3 containers requested and 2 workers in use
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
+    // 3 containers requested and 4 workers in use
+    assertContainerRequest(mockYarnService, 3,
+        ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", 
"GobblinYarnTaskRunner-3", "GobblinYarnTaskRunner-4"));
   }
 
   /**
    * Test multiple partitions to one container
    */
   @Test
-  public void testMultiplePartitionsPerContainer() throws IOException {
+  public void testMultiplePartitionsPerContainer()  {
     YarnService mockYarnService = mock(YarnService.class);
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
-    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
-    JobDag mockJobDag = mock(JobDag.class);
-
-    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
-    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+    WorkflowConfig mockWorkflowConfig =
+        getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1"), 
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
 
     Mockito.when(mockTaskDriver.getWorkflows())
         .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
 
-    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
-    JobContext mockJobContext = mock(JobContext.class);
-    Mockito.when(mockJobContext.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
+    getJobContext(mockTaskDriver, ImmutableMap.of(2, 
"GobblinYarnTaskRunner-1"), "job1");
+    HelixDataAccessor helixDataAccessor = 
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1"));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 2,
@@ -342,42 +216,21 @@ public class YarnAutoScalingManagerTest {
     runnable.run();
 
     // 1 container requested since 2 partitions and limit is 2 partitions per 
container. One worker in use.
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
+    assertContainerRequest(mockYarnService, 1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
   @Test
   public void testOverprovision() {
     YarnService mockYarnService = mock(YarnService.class);
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
-    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
-    JobDag mockJobDag = mock(JobDag.class);
-
-    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
-    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
+    WorkflowConfig mockWorkflowConfig =
+        getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1"), 
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
     Mockito.when(mockTaskDriver.getWorkflows())
         .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
 
-    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
-    JobContext mockJobContext = mock(JobContext.class);
-    Mockito.when(mockJobContext.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+    getJobContext(mockTaskDriver, ImmutableMap.of(2, 
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
 
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
+    HelixDataAccessor helixDataAccessor = 
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1"));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
@@ -388,13 +241,9 @@ public class YarnAutoScalingManagerTest {
     // 3 containers requested to max and one worker in use
     // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
     // so targetNumContainers = Ceil((2/1) * 1.2)) = 3.
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
-
+    assertContainerRequest(mockYarnService, 3, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
     Mockito.reset(mockYarnService);
+
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
             0.1, noopQueue, helixDataAccessor, defaultHelixTag, 
defaultContainerMemory, defaultContainerCores);
@@ -404,10 +253,7 @@ public class YarnAutoScalingManagerTest {
     // 3 containers requested to max and one worker in use
     // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
     // so targetNumContainers = Ceil((2/1) * 0.1)) = 1.
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
+    assertContainerRequest(mockYarnService, 1, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
 
     Mockito.reset(mockYarnService);
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
@@ -419,44 +265,22 @@ public class YarnAutoScalingManagerTest {
     // 3 containers requested to max and one worker in use
     // NumPartitions = 2, Partitions per container = 1 and overprovision = 6.0,
     // so targetNumContainers = Ceil((2/1) * 6.0)) = 12.
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 12);
+    assertContainerRequest(mockYarnService, 12, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
   /**
    * Test suppressed exception
    */
   @Test
-  public void testSuppressedException() throws IOException {
+  public void testSuppressedException()  {
     YarnService mockYarnService = mock(YarnService.class);
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
-    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
-    JobDag mockJobDag = mock(JobDag.class);
+    WorkflowConfig mockWorkflowConfig = getWorkflowConfig(mockTaskDriver, 
ImmutableSet.of("job1"), TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+    
Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of("workflow1",
 mockWorkflowConfig));
 
-    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
-    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+    getJobContext(mockTaskDriver, ImmutableMap.of(2, 
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
 
-    Mockito.when(mockTaskDriver.getWorkflows())
-        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
-    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
-    JobContext mockJobContext = mock(JobContext.class);
-    Mockito.when(mockJobContext.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty("")));
+    HelixDataAccessor helixDataAccessor = 
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1"));
 
     TestYarnAutoScalingRunnable runnable =
         new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 
helixDataAccessor);
@@ -471,14 +295,12 @@ public class YarnAutoScalingManagerTest {
     Mockito.reset(mockYarnService);
     runnable.setRaiseException(false);
     runnable.run();
+
     // 2 container requested
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+    assertContainerRequest(mockYarnService, 2, 
ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
-  public void testMaxValueEvictingQueue() throws Exception {
+  public void testMaxValueEvictingQueue()  {
     Resource resource = Resource.newInstance(16, 1);
     YarnAutoScalingManager.SlidingWindowReservoir window = new 
YarnAutoScalingManager.SlidingWindowReservoir(3, 10);
     // Normal insertion with eviction of originally largest value
@@ -503,36 +325,16 @@ public class YarnAutoScalingManagerTest {
    * candidate for scaling-down.
    */
   @Test
-  public void testInstanceIdleBeyondTolerance() throws IOException {
+  public void testInstanceIdleBeyondTolerance()  {
     YarnService mockYarnService = mock(YarnService.class);
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
-    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
-    JobDag mockJobDag = mock(JobDag.class);
-    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
-    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
-    Mockito.when(mockTaskDriver.getWorkflows())
-        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
-    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+    WorkflowConfig mockWorkflowConfig = getWorkflowConfig(mockTaskDriver, 
ImmutableSet.of("job1"), TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+    
Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of("workflow1",
 mockWorkflowConfig));
 
     // Having both partition assigned to single instance initially, in this 
case, GobblinYarnTaskRunner-2
-    JobContext mockJobContext = mock(JobContext.class);
-    Mockito.when(mockJobContext.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext.getAssignedParticipant(1)).thenReturn("GobblinYarnTaskRunner-2");
-    
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-2");
+    getJobContext(mockTaskDriver, ImmutableMap.of(1,"GobblinYarnTaskRunner-2", 
2, "GobblinYarnTaskRunner-2"), "job1");
 
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty(""),
-            "GobblinYarnTaskRunner-2", new HelixProperty("")));
+    HelixDataAccessor helixDataAccessor = 
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2"));
 
     TestYarnAutoScalingRunnable runnable = new 
TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
         1, helixDataAccessor);
@@ -541,21 +343,14 @@ public class YarnAutoScalingManagerTest {
 
     // 2 containers requested and one worker in use, while the evaluation will 
hold for true if not set externally,
     // still tell YarnService there are two instances being used.
-    ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+    assertContainerRequest(mockYarnService, 2, 
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
 
     // Set failEvaluation which simulates the "beyond tolerance" case.
     Mockito.reset(mockYarnService);
     runnable.setAlwaysTagUnused(true);
     runnable.run();
 
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-2")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+    assertContainerRequest(mockYarnService, 2, 
ImmutableSet.of("GobblinYarnTaskRunner-2"));
   }
 
   @Test
@@ -563,63 +358,29 @@ public class YarnAutoScalingManagerTest {
     YarnService mockYarnService = mock(YarnService.class);
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
 
-    WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
-    JobDag mockJobDag1 = mock(JobDag.class);
-
-    Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1", 
"job2"));
-    Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
-
-    WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
-
-    JobContext mockJobContext1 = mock(JobContext.class);
-    Mockito.when(mockJobContext1.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-    
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
-
-    JobContext mockJobContext2 = mock(JobContext.class);
-    Mockito.when(mockJobContext2.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-    
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
-    
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
-
-    WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
-    JobDag mockJobDag2 = mock(JobDag.class);
-
-    
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
-    Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
-
-    WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
-    
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+    WorkflowConfig mockWorkflowConfig1 =
+        getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1", "job2"), 
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+    WorkflowConfig mockWorkflowConfig2 =
+        getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job3"), 
TaskState.IN_PROGRESS, TargetState.START, "workflow2");
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, 
"workflow2", mockWorkflowConfig2));
 
-    
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
+    getJobContext(mockTaskDriver, ImmutableMap.of(2, 
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
+    getJobContext(mockTaskDriver, ImmutableMap.of(3, 
"GobblinYarnTaskRunner-2"), "job2");
+    getJobContext(mockTaskDriver, ImmutableMap.of(4, 
"GobblinYarnTaskRunner-3"), "job3", ImmutableSet.of(4, 5));
 
-    JobContext mockJobContext3 = mock(JobContext.class);
-    Mockito.when(mockJobContext3.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
-    
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
-    
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
     JobConfig mockJobConfig3 = mock(JobConfig.class);
+    
Mockito.when(mockTaskDriver.getJobConfig("job3")).thenReturn(mockJobConfig3);
     String helixTag = "test-Tag1";
-    Map<String, String> resourceMap = new HashMap<>();
-    
resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS, 
"512");
-    resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES, 
"8");
+    Map<String, String> resourceMap = ImmutableMap.of(
+        GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS, "512",
+        GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES, "8"
+    );
     Mockito.when(mockJobConfig3.getInstanceGroupTag()).thenReturn(helixTag);
     
Mockito.when(mockJobConfig3.getJobCommandConfigMap()).thenReturn(resourceMap);
-    
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
-    
Mockito.when(mockTaskDriver.getJobConfig("job3")).thenReturn(mockJobConfig3);
-    Mockito.when(mockTaskDriver.getWorkflows())
-        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, 
"workflow2", mockWorkflowConfig2));
 
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new 
HelixProperty(""),
-            "GobblinYarnTaskRunner-2", new HelixProperty(""),
-            "GobblinYarnTaskRunner-3", new HelixProperty("")));
+    HelixDataAccessor helixDataAccessor = getHelixDataAccessor(
+        Arrays.asList("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", 
"GobblinYarnTaskRunner-3"));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, 
mockYarnService, 1,
@@ -629,22 +390,76 @@ public class YarnAutoScalingManagerTest {
 
     // 5 containers requested and 3 workers in use
     ArgumentCaptor<YarnContainerRequestBundle> argument = 
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
-    Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(argument.capture(),
-            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", 
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
-    Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
+    assertContainerRequest(argument, mockYarnService, 5, 
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", 
"GobblinYarnTaskRunner-3"));
+
+    // Verify that 3 containers requested with default tag and resource 
setting,
+    // while 2 with specific helix tag and resource requirement
     Map<String, Set<String>> resourceHelixTagMap = 
argument.getValue().getResourceHelixTagMap();
     Map<String, Resource> helixTagResourceMap = 
argument.getValue().getHelixTagResourceMap();
     Map<String, Integer> helixTagContainerCountMap = 
argument.getValue().getHelixTagContainerCountMap();
 
-    // Verify that 3 containers requested with default tag and resource 
setting,
-    // while 2 with specific helix tag and resource requirement
     Assert.assertEquals(resourceHelixTagMap.size(), 2);
     Assert.assertEquals(helixTagResourceMap.get(helixTag), 
Resource.newInstance(512, 8));
     Assert.assertEquals(helixTagResourceMap.get(defaultHelixTag), 
Resource.newInstance(defaultContainerMemory, defaultContainerCores));
     Assert.assertEquals((int) helixTagContainerCountMap.get(helixTag), 2);
     Assert.assertEquals((int) helixTagContainerCountMap.get(defaultHelixTag), 
3);
+  }
+
+  private HelixDataAccessor getHelixDataAccessor(List<String> taskRunners) {
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new 
PropertyKey.Builder("cluster"));
+
+    
Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())).thenReturn(
+        taskRunners.stream().collect(Collectors.toMap((name) -> name, (name) 
-> new HelixProperty(""))));
+    return helixDataAccessor;
+  }
+
+  private WorkflowConfig getWorkflowConfig(TaskDriver mockTaskDriver, 
ImmutableSet<String> jobNames,
+      TaskState taskState, TargetState targetState, String workflowName) {
+    WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
+    JobDag mockJobDag1 = mock(JobDag.class);
+
+    Mockito.when(mockJobDag1.getAllNodes()).thenReturn(jobNames);
+    Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
+    Mockito.when(mockWorkflowConfig1.getTargetState()).thenReturn(targetState);
+
+    WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
+    
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(taskState);
+
+    
Mockito.when(mockTaskDriver.getWorkflowContext(workflowName)).thenReturn(mockWorkflowContext1);
+    return mockWorkflowConfig1;
+  }
+
+  private JobContext getJobContext(TaskDriver mockTaskDriver, Map<Integer, 
String> assignedParticipantMap, String jobName) {
+    return getJobContext(mockTaskDriver, assignedParticipantMap, jobName, 
assignedParticipantMap.keySet());
+  }
+
+  private JobContext getJobContext(
+      TaskDriver mockTaskDriver,
+      Map<Integer, String> assignedParticipantMap,
+      String jobName,
+      Set<Integer> partitionSet) {
+    JobContext mockJobContext = mock(JobContext.class);
+    
Mockito.when(mockJobContext.getPartitionSet()).thenReturn(ImmutableSet.copyOf(partitionSet));
+    for (Map.Entry<Integer, String> entry : assignedParticipantMap.entrySet()) 
{
+      
Mockito.when(mockJobContext.getAssignedParticipant(entry.getKey())).thenReturn(entry.getValue());
+    }
+    
Mockito.when(mockTaskDriver.getJobContext(jobName)).thenReturn(mockJobContext);
+    return mockJobContext;
+  }
+
+  private void 
assertContainerRequest(ArgumentCaptor<YarnContainerRequestBundle> argument, 
YarnService mockYarnService, int expectedNumberOfContainers,
+      ImmutableSet<String> expectedInUseInstances) {
+     ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(expectedInUseInstances));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 
expectedNumberOfContainers);
+  }
 
+  private void assertContainerRequest(YarnService mockYarnService, int 
expectedNumberOfContainers,
+      ImmutableSet<String> expectedInUseInstances) {
+    
assertContainerRequest(ArgumentCaptor.forClass(YarnContainerRequestBundle.class),
 mockYarnService, expectedNumberOfContainers, expectedInUseInstances);
   }
 
   private static class TestYarnAutoScalingRunnable extends 
YarnAutoScalingManager.YarnAutoScalingRunnable {


Reply via email to