This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 702cadf48 [GOBBLIN-1844] Ignore workflows marked for deletion when
calculating container count (#3709)
702cadf48 is described below
commit 702cadf48f910c79b129032aa673f08ce4397c03
Author: Matthew Ho <[email protected]>
AuthorDate: Mon Jun 26 14:30:55 2023 -0700
[GOBBLIN-1844] Ignore workflows marked for deletion when calculating
container count (#3709)
* [GOBBLIN-1844] Ignore workflows marked for deletion when calculating
container count
* Add comment
---
.../gobblin/yarn/YarnAutoScalingManager.java | 11 +-
.../gobblin/yarn/YarnAutoScalingManagerTest.java | 501 +++++++--------------
2 files changed, 165 insertions(+), 347 deletions(-)
diff --git
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index e6683cfd3..a2f4d8372 100644
---
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-
import org.apache.commons.compress.utils.Sets;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
@@ -39,6 +38,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
@@ -220,16 +220,19 @@ public class YarnAutoScalingManager extends
AbstractIdleService {
YarnContainerRequestBundle yarnContainerRequestBundle = new
YarnContainerRequestBundle();
for (Map.Entry<String, WorkflowConfig> workFlowEntry :
taskDriver.getWorkflows().entrySet()) {
WorkflowContext workflowContext =
taskDriver.getWorkflowContext(workFlowEntry.getKey());
+ WorkflowConfig workflowConfig = workFlowEntry.getValue();
- // Only allocate for active workflows
- if (workflowContext == null ||
!workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS)) {
+ // Only allocate for active workflows. Those marked for deletion are
ignored but the existing containers won't
+ // be released until maxIdleTimeInMinutesBeforeScalingDown
+ if (workflowContext == null ||
+ TargetState.DELETE.equals(workflowConfig.getTargetState()) ||
+ !workflowContext.getWorkflowState().equals(TaskState.IN_PROGRESS))
{
continue;
}
log.debug("Workflow name {} config {} context {}",
workFlowEntry.getKey(), workFlowEntry.getValue(),
workflowContext);
- WorkflowConfig workflowConfig = workFlowEntry.getValue();
JobDag jobDag = workflowConfig.getJobDag();
Set<String> jobs = jobDag.getAllNodes();
diff --git
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 687af96fd..10563c2bb 100644
---
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -17,10 +17,11 @@
package org.apache.gobblin.yarn;
-import java.io.IOException;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.helix.HelixDataAccessor;
@@ -29,6 +30,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
+import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
@@ -63,34 +65,16 @@ public class YarnAutoScalingManagerTest {
* Test for one workflow with one job
*/
@Test
- public void testOneJob() throws IOException {
+ public void testOneJob() {
YarnService mockYarnService = mock(YarnService.class);
TaskDriver mockTaskDriver = mock(TaskDriver.class);
- WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
- JobDag mockJobDag = mock(JobDag.class);
+ WorkflowConfig mockWorkflowConfig =
+ getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1"),
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+
Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of("workflow1",
mockWorkflowConfig));
- Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
- Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+ getJobContext(mockTaskDriver, ImmutableMap.of(2,
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
- Mockito.when(mockTaskDriver.getWorkflows())
- .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
- WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
- JobContext mockJobContext = mock(JobContext.class);
- Mockito.when(mockJobContext.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
+ HelixDataAccessor helixDataAccessor =
getHelixDataAccessor(Arrays.asList("GobblinClusterManager",
"GobblinYarnTaskRunner-1"));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
@@ -109,40 +93,18 @@ public class YarnAutoScalingManagerTest {
* Test for one workflow with two jobs
*/
@Test
- public void testTwoJobs() throws IOException {
+ public void testTwoJobs() {
YarnService mockYarnService = mock(YarnService.class);
TaskDriver mockTaskDriver = mock(TaskDriver.class);
- WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
- JobDag mockJobDag = mock(JobDag.class);
-
- Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1",
"job2"));
- Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
+ WorkflowConfig mockWorkflowConfig =
+ getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1", "job2"),
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
Mockito.when(mockTaskDriver.getWorkflows())
.thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
- WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+ getJobContext(mockTaskDriver, ImmutableMap.of(2,
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
+ getJobContext(mockTaskDriver, ImmutableMap.of(3,
"GobblinYarnTaskRunner-2"), "job2");
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
- JobContext mockJobContext1 = mock(JobContext.class);
- Mockito.when(mockJobContext1.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
-
- JobContext mockJobContext2 = mock(JobContext.class);
- Mockito.when(mockJobContext2.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
-
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
-
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty(""),
- "GobblinYarnTaskRunner-2", new HelixProperty("")));
+ HelixDataAccessor helixDataAccessor =
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2"));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
@@ -162,59 +124,24 @@ public class YarnAutoScalingManagerTest {
* Test for two workflows
*/
@Test
- public void testTwoWorkflows() throws IOException {
+ public void testTwoWorkflows() {
YarnService mockYarnService = mock(YarnService.class);
TaskDriver mockTaskDriver = mock(TaskDriver.class);
- WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
- JobDag mockJobDag1 = mock(JobDag.class);
-
- Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1",
"job2"));
- Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
-
- WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
-
- JobContext mockJobContext1 = mock(JobContext.class);
- Mockito.when(mockJobContext1.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
-
- JobContext mockJobContext2 = mock(JobContext.class);
- Mockito.when(mockJobContext2.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
-
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
-
- WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
- JobDag mockJobDag2 = mock(JobDag.class);
-
-
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
- Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
-
- WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
-
- JobContext mockJobContext3 = mock(JobContext.class);
- Mockito.when(mockJobContext3.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
-
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
-
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+ WorkflowConfig mockWorkflowConfig1 =
+ getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1", "job2"),
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+ WorkflowConfig mockWorkflowConfig2 =
+ getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job3"),
TaskState.IN_PROGRESS, TargetState.START, "workflow2");
Mockito.when(mockTaskDriver.getWorkflows())
.thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1,
"workflow2", mockWorkflowConfig2));
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty(""),
- "GobblinYarnTaskRunner-2", new HelixProperty(""),
- "GobblinYarnTaskRunner-3", new HelixProperty("")));
+ getJobContext(mockTaskDriver, ImmutableMap.of(2,
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
+ getJobContext(mockTaskDriver, ImmutableMap.of(3,
"GobblinYarnTaskRunner-2"), "job2");
+ getJobContext(mockTaskDriver, ImmutableMap.of(4,
"GobblinYarnTaskRunner-3"), "job3", ImmutableSet.of(4,5));
+
+ HelixDataAccessor helixDataAccessor = getHelixDataAccessor(Arrays.asList(
+ "GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2","GobblinYarnTaskRunner-3"));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
@@ -223,70 +150,37 @@ public class YarnAutoScalingManagerTest {
runnable.run();
// 5 containers requested and 3 workers in use
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
+ assertContainerRequest(mockYarnService, 5,
+ ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2",
"GobblinYarnTaskRunner-3"));
}
/**
- * Test for two workflows with one not in progress.
- * The partitions for the workflow that is not in progress should not be
counted.
+ * Test for three workflows with one not in progress and one marked for
delete.
+ * The partitions for the workflow that is not in progress or is marked for
delete should not be counted.
*/
@Test
- public void testNotInProgress() throws IOException {
+ public void testNotInProgressOrBeingDeleted() {
YarnService mockYarnService = mock(YarnService.class);
TaskDriver mockTaskDriver = mock(TaskDriver.class);
- WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
- JobDag mockJobDag1 = mock(JobDag.class);
-
- Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1",
"job2"));
- Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
-
- WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
-
- JobContext mockJobContext1 = mock(JobContext.class);
- Mockito.when(mockJobContext1.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
-
- JobContext mockJobContext2 = mock(JobContext.class);
- Mockito.when(mockJobContext2.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
-
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
-
- WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
- JobDag mockJobDag2 = mock(JobDag.class);
-
-
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
- Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
-
- WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.COMPLETED);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
-
- JobContext mockJobContext3 = mock(JobContext.class);
- Mockito.when(mockJobContext3.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
-
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
-
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
-
- Mockito.when(mockTaskDriver.getWorkflows())
- .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1,
"workflow2", mockWorkflowConfig2));
-
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty(""),
- "GobblinYarnTaskRunner-2", new HelixProperty("")));
+ WorkflowConfig workflowInProgress = getWorkflowConfig(mockTaskDriver,
ImmutableSet.of("job-inProgress-1", "job-inProgress-2"), TaskState.IN_PROGRESS,
TargetState.START, "workflowInProgress");
+ WorkflowConfig workflowCompleted = getWorkflowConfig(mockTaskDriver,
ImmutableSet.of("job-complete-1"), TaskState.COMPLETED, TargetState.STOP,
"workflowCompleted");
+ WorkflowConfig workflowSetToBeDeleted = getWorkflowConfig(mockTaskDriver,
ImmutableSet.of("job-setToDelete-1"), TaskState.IN_PROGRESS,
TargetState.DELETE, "workflowSetToBeDeleted");
+ Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of(
+ "workflowInProgress", workflowInProgress,
+ "workflowCompleted", workflowCompleted,
+ "workflowSetToBeDeleted", workflowSetToBeDeleted));
+
+ getJobContext(mockTaskDriver, ImmutableMap.of(1,
"GobblinYarnTaskRunner-1"), "job-inProgress-1",
+ ImmutableSet.of(1,2));
+ getJobContext(mockTaskDriver, ImmutableMap.of(2,
"GobblinYarnTaskRunner-2"), "job-inProgress-2");
+ getJobContext(mockTaskDriver, ImmutableMap.of(1,
"GobblinYarnTaskRunner-3"), "job-setToDelete-1");
+ getJobContext(mockTaskDriver, ImmutableMap.of(1,
"GobblinYarnTaskRunner-4"), "job-complete-1",
+ ImmutableSet.of(1, 5));
+
+ HelixDataAccessor helixDataAccessor = getHelixDataAccessor(
+ Arrays.asList("GobblinClusterManager",
+ "GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2",
"GobblinYarnTaskRunner-3", "GobblinYarnTaskRunner-4"));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
@@ -294,46 +188,26 @@ public class YarnAutoScalingManagerTest {
runnable.run();
- // 3 containers requested and 2 workers in use
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
+ // 3 containers requested and 4 workers in use
+ assertContainerRequest(mockYarnService, 3,
+ ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2",
"GobblinYarnTaskRunner-3", "GobblinYarnTaskRunner-4"));
}
/**
* Test multiple partitions to one container
*/
@Test
- public void testMultiplePartitionsPerContainer() throws IOException {
+ public void testMultiplePartitionsPerContainer() {
YarnService mockYarnService = mock(YarnService.class);
TaskDriver mockTaskDriver = mock(TaskDriver.class);
- WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
- JobDag mockJobDag = mock(JobDag.class);
-
- Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
- Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+ WorkflowConfig mockWorkflowConfig =
+ getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1"),
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
Mockito.when(mockTaskDriver.getWorkflows())
.thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
- WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
- JobContext mockJobContext = mock(JobContext.class);
- Mockito.when(mockJobContext.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
+ getJobContext(mockTaskDriver, ImmutableMap.of(2,
"GobblinYarnTaskRunner-1"), "job1");
+ HelixDataAccessor helixDataAccessor =
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1"));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 2,
@@ -342,42 +216,21 @@ public class YarnAutoScalingManagerTest {
runnable.run();
// 1 container requested since 2 partitions and limit is 2 partitions per
container. One worker in use.
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
+ assertContainerRequest(mockYarnService, 1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
@Test
public void testOverprovision() {
YarnService mockYarnService = mock(YarnService.class);
TaskDriver mockTaskDriver = mock(TaskDriver.class);
- WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
- JobDag mockJobDag = mock(JobDag.class);
-
- Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
- Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
+ WorkflowConfig mockWorkflowConfig =
+ getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1"),
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
Mockito.when(mockTaskDriver.getWorkflows())
.thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
- WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
- JobContext mockJobContext = mock(JobContext.class);
- Mockito.when(mockJobContext.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+ getJobContext(mockTaskDriver, ImmutableMap.of(2,
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
+ HelixDataAccessor helixDataAccessor =
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1"));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
@@ -388,13 +241,9 @@ public class YarnAutoScalingManagerTest {
// 3 containers requested to max and one worker in use
// NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
// so targetNumContainers = Ceil((2/1) * 1.2)) = 3.
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
-
+ assertContainerRequest(mockYarnService, 3,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
Mockito.reset(mockYarnService);
+
YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
0.1, noopQueue, helixDataAccessor, defaultHelixTag,
defaultContainerMemory, defaultContainerCores);
@@ -404,10 +253,7 @@ public class YarnAutoScalingManagerTest {
// 3 containers requested to max and one worker in use
// NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
// so targetNumContainers = Ceil((2/1) * 0.1)) = 1.
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
+ assertContainerRequest(mockYarnService, 1,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
Mockito.reset(mockYarnService);
YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
@@ -419,44 +265,22 @@ public class YarnAutoScalingManagerTest {
// 3 containers requested to max and one worker in use
// NumPartitions = 2, Partitions per container = 1 and overprovision = 6.0,
// so targetNumContainers = Ceil((2/1) * 6.0)) = 12.
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 12);
+ assertContainerRequest(mockYarnService, 12,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
/**
* Test suppressed exception
*/
@Test
- public void testSuppressedException() throws IOException {
+ public void testSuppressedException() {
YarnService mockYarnService = mock(YarnService.class);
TaskDriver mockTaskDriver = mock(TaskDriver.class);
- WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
- JobDag mockJobDag = mock(JobDag.class);
+ WorkflowConfig mockWorkflowConfig = getWorkflowConfig(mockTaskDriver,
ImmutableSet.of("job1"), TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+
Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of("workflow1",
mockWorkflowConfig));
- Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
- Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+ getJobContext(mockTaskDriver, ImmutableMap.of(2,
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
- Mockito.when(mockTaskDriver.getWorkflows())
- .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
- WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
- JobContext mockJobContext = mock(JobContext.class);
- Mockito.when(mockJobContext.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty("")));
+ HelixDataAccessor helixDataAccessor =
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1"));
TestYarnAutoScalingRunnable runnable =
new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
helixDataAccessor);
@@ -471,14 +295,12 @@ public class YarnAutoScalingManagerTest {
Mockito.reset(mockYarnService);
runnable.setRaiseException(false);
runnable.run();
+
// 2 container requested
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+ assertContainerRequest(mockYarnService, 2,
ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
- public void testMaxValueEvictingQueue() throws Exception {
+ public void testMaxValueEvictingQueue() {
Resource resource = Resource.newInstance(16, 1);
YarnAutoScalingManager.SlidingWindowReservoir window = new
YarnAutoScalingManager.SlidingWindowReservoir(3, 10);
// Normal insertion with eviction of originally largest value
@@ -503,36 +325,16 @@ public class YarnAutoScalingManagerTest {
* candidate for scaling-down.
*/
@Test
- public void testInstanceIdleBeyondTolerance() throws IOException {
+ public void testInstanceIdleBeyondTolerance() {
YarnService mockYarnService = mock(YarnService.class);
TaskDriver mockTaskDriver = mock(TaskDriver.class);
- WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
- JobDag mockJobDag = mock(JobDag.class);
- Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
- Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
- Mockito.when(mockTaskDriver.getWorkflows())
- .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
- WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+ WorkflowConfig mockWorkflowConfig = getWorkflowConfig(mockTaskDriver,
ImmutableSet.of("job1"), TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+
Mockito.when(mockTaskDriver.getWorkflows()).thenReturn(ImmutableMap.of("workflow1",
mockWorkflowConfig));
// Having both partition assigned to single instance initially, in this
case, GobblinYarnTaskRunner-2
- JobContext mockJobContext = mock(JobContext.class);
- Mockito.when(mockJobContext.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext.getAssignedParticipant(1)).thenReturn("GobblinYarnTaskRunner-2");
-
Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-2");
+ getJobContext(mockTaskDriver, ImmutableMap.of(1,"GobblinYarnTaskRunner-2",
2, "GobblinYarnTaskRunner-2"), "job1");
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty(""),
- "GobblinYarnTaskRunner-2", new HelixProperty("")));
+ HelixDataAccessor helixDataAccessor =
getHelixDataAccessor(Arrays.asList("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2"));
TestYarnAutoScalingRunnable runnable = new
TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
1, helixDataAccessor);
@@ -541,21 +343,14 @@ public class YarnAutoScalingManagerTest {
// 2 containers requested and one worker in use, while the evaluation will
hold for true if not set externally,
// still tell YarnService there are two instances being used.
- ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+ assertContainerRequest(mockYarnService, 2,
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
// Set failEvaluation which simulates the "beyond tolerance" case.
Mockito.reset(mockYarnService);
runnable.setAlwaysTagUnused(true);
runnable.run();
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-2")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+ assertContainerRequest(mockYarnService, 2,
ImmutableSet.of("GobblinYarnTaskRunner-2"));
}
@Test
@@ -563,63 +358,29 @@ public class YarnAutoScalingManagerTest {
YarnService mockYarnService = mock(YarnService.class);
TaskDriver mockTaskDriver = mock(TaskDriver.class);
- WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
- JobDag mockJobDag1 = mock(JobDag.class);
-
- Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1",
"job2"));
- Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
-
- WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
-
- JobContext mockJobContext1 = mock(JobContext.class);
- Mockito.when(mockJobContext1.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-
Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
-
- JobContext mockJobContext2 = mock(JobContext.class);
- Mockito.when(mockJobContext2.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-
Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
-
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
-
- WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
- JobDag mockJobDag2 = mock(JobDag.class);
-
-
Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
- Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
-
- WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
-
Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+ WorkflowConfig mockWorkflowConfig1 =
+ getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job1", "job2"),
TaskState.IN_PROGRESS, TargetState.START, "workflow1");
+ WorkflowConfig mockWorkflowConfig2 =
+ getWorkflowConfig(mockTaskDriver, ImmutableSet.of("job3"),
TaskState.IN_PROGRESS, TargetState.START, "workflow2");
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1,
"workflow2", mockWorkflowConfig2));
-
Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
+ getJobContext(mockTaskDriver, ImmutableMap.of(2,
"GobblinYarnTaskRunner-1"), "job1", ImmutableSet.of(1, 2));
+ getJobContext(mockTaskDriver, ImmutableMap.of(3,
"GobblinYarnTaskRunner-2"), "job2");
+ getJobContext(mockTaskDriver, ImmutableMap.of(4,
"GobblinYarnTaskRunner-3"), "job3", ImmutableSet.of(4, 5));
- JobContext mockJobContext3 = mock(JobContext.class);
- Mockito.when(mockJobContext3.getPartitionSet())
- .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
-
Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
-
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
JobConfig mockJobConfig3 = mock(JobConfig.class);
+
Mockito.when(mockTaskDriver.getJobConfig("job3")).thenReturn(mockJobConfig3);
String helixTag = "test-Tag1";
- Map<String, String> resourceMap = new HashMap<>();
-
resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS,
"512");
- resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES,
"8");
+ Map<String, String> resourceMap = ImmutableMap.of(
+ GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS, "512",
+ GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES, "8"
+ );
Mockito.when(mockJobConfig3.getInstanceGroupTag()).thenReturn(helixTag);
Mockito.when(mockJobConfig3.getJobCommandConfigMap()).thenReturn(resourceMap);
-
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
-
Mockito.when(mockTaskDriver.getJobConfig("job3")).thenReturn(mockJobConfig3);
- Mockito.when(mockTaskDriver.getWorkflows())
- .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1,
"workflow2", mockWorkflowConfig2));
- HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
- Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
- Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
- .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new
HelixProperty(""),
- "GobblinYarnTaskRunner-2", new HelixProperty(""),
- "GobblinYarnTaskRunner-3", new HelixProperty("")));
+ HelixDataAccessor helixDataAccessor = getHelixDataAccessor(
+ Arrays.asList("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2",
"GobblinYarnTaskRunner-3"));
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver,
mockYarnService, 1,
@@ -629,22 +390,76 @@ public class YarnAutoScalingManagerTest {
// 5 containers requested and 3 workers in use
ArgumentCaptor<YarnContainerRequestBundle> argument =
ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
- Mockito.verify(mockYarnService, times(1)).
- requestTargetNumberOfContainers(argument.capture(),
- eq(ImmutableSet.of("GobblinYarnTaskRunner-1",
"GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
- Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
+ assertContainerRequest(argument, mockYarnService, 5,
ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2",
"GobblinYarnTaskRunner-3"));
+
+ // Verify that 3 containers requested with default tag and resource
setting,
+ // while 2 with specific helix tag and resource requirement
Map<String, Set<String>> resourceHelixTagMap =
argument.getValue().getResourceHelixTagMap();
Map<String, Resource> helixTagResourceMap =
argument.getValue().getHelixTagResourceMap();
Map<String, Integer> helixTagContainerCountMap =
argument.getValue().getHelixTagContainerCountMap();
- // Verify that 3 containers requested with default tag and resource
setting,
- // while 2 with specific helix tag and resource requirement
Assert.assertEquals(resourceHelixTagMap.size(), 2);
Assert.assertEquals(helixTagResourceMap.get(helixTag),
Resource.newInstance(512, 8));
Assert.assertEquals(helixTagResourceMap.get(defaultHelixTag),
Resource.newInstance(defaultContainerMemory, defaultContainerCores));
Assert.assertEquals((int) helixTagContainerCountMap.get(helixTag), 2);
Assert.assertEquals((int) helixTagContainerCountMap.get(defaultHelixTag),
3);
+ }
+
+ private HelixDataAccessor getHelixDataAccessor(List<String> taskRunners) {
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new
PropertyKey.Builder("cluster"));
+
+
Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any())).thenReturn(
+ taskRunners.stream().collect(Collectors.toMap((name) -> name, (name)
-> new HelixProperty(""))));
+ return helixDataAccessor;
+ }
+
+ private WorkflowConfig getWorkflowConfig(TaskDriver mockTaskDriver,
ImmutableSet<String> jobNames,
+ TaskState taskState, TargetState targetState, String workflowName) {
+ WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
+ JobDag mockJobDag1 = mock(JobDag.class);
+
+ Mockito.when(mockJobDag1.getAllNodes()).thenReturn(jobNames);
+ Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
+ Mockito.when(mockWorkflowConfig1.getTargetState()).thenReturn(targetState);
+
+ WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
+
Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(taskState);
+
+
Mockito.when(mockTaskDriver.getWorkflowContext(workflowName)).thenReturn(mockWorkflowContext1);
+ return mockWorkflowConfig1;
+ }
+
+ private JobContext getJobContext(TaskDriver mockTaskDriver, Map<Integer,
String> assignedParticipantMap, String jobName) {
+ return getJobContext(mockTaskDriver, assignedParticipantMap, jobName,
assignedParticipantMap.keySet());
+ }
+
+ private JobContext getJobContext(
+ TaskDriver mockTaskDriver,
+ Map<Integer, String> assignedParticipantMap,
+ String jobName,
+ Set<Integer> partitionSet) {
+ JobContext mockJobContext = mock(JobContext.class);
+
Mockito.when(mockJobContext.getPartitionSet()).thenReturn(ImmutableSet.copyOf(partitionSet));
+ for (Map.Entry<Integer, String> entry : assignedParticipantMap.entrySet())
{
+
Mockito.when(mockJobContext.getAssignedParticipant(entry.getKey())).thenReturn(entry.getValue());
+ }
+
Mockito.when(mockTaskDriver.getJobContext(jobName)).thenReturn(mockJobContext);
+ return mockJobContext;
+ }
+
+ private void
assertContainerRequest(ArgumentCaptor<YarnContainerRequestBundle> argument,
YarnService mockYarnService, int expectedNumberOfContainers,
+ ImmutableSet<String> expectedInUseInstances) {
+ ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(argument.capture(),
+ eq(expectedInUseInstances));
+ Assert.assertEquals(argument.getValue().getTotalContainers(),
expectedNumberOfContainers);
+ }
+ private void assertContainerRequest(YarnService mockYarnService, int
expectedNumberOfContainers,
+ ImmutableSet<String> expectedInUseInstances) {
+
assertContainerRequest(ArgumentCaptor.forClass(YarnContainerRequestBundle.class),
mockYarnService, expectedNumberOfContainers, expectedInUseInstances);
}
private static class TestYarnAutoScalingRunnable extends
YarnAutoScalingManager.YarnAutoScalingRunnable {