jeongyooneo closed pull request #17: [NEMO-66] Move /tests/scheduler into /runtime/master/scheduler/test URL: https://github.com/apache/incubator-nemo/pull/17
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java new file mode 100644 index 00000000..50d24664 --- /dev/null +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPullPolicy.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.optimizer.policy; + +import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass; +import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass; + +import java.util.ArrayList; +import java.util.List; + +/** + * Basic pull policy. + */ +public final class BasicPullPolicy implements Policy { + @Override + public List<CompileTimePass> getCompileTimePasses() { + List<CompileTimePass> policy = new ArrayList<>(); + policy.add(new DefaultStagePartitioningPass()); + policy.add(new ScheduleGroupPass()); + return policy; + } + + @Override + public List<RuntimePass<?>> getRuntimePasses() { + return new ArrayList<>(); + } +} diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java new file mode 100644 index 00000000..201cfa92 --- /dev/null +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/BasicPushPolicy.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.optimizer.policy; + +import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.DefaultStagePartitioningPass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ScheduleGroupPass; +import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.ShuffleEdgePushPass; +import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass; + +import java.util.ArrayList; +import java.util.List; + +/** + * Basic push policy. + */ +public final class BasicPushPolicy implements Policy { + @Override + public List<CompileTimePass> getCompileTimePasses() { + List<CompileTimePass> policy = new ArrayList<>(); + policy.add(new DefaultStagePartitioningPass()); + policy.add(new ShuffleEdgePushPass()); + policy.add(new ScheduleGroupPass()); + return policy; + } + + @Override + public List<RuntimePass<?>> getRuntimePasses() { + return new ArrayList<>(); + } +} diff --git a/pom.xml b/pom.xml index ba0a0991..6f13f595 100644 --- a/pom.xml +++ b/pom.xml @@ -63,6 +63,7 @@ limitations under the License. <module>runtime/executor</module> <module>runtime/master</module> <module>runtime/driver</module> + <module>runtime/plangenerator</module> <module>tests</module> </modules> diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java index cf48631c..8b750009 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java @@ -106,7 +106,8 @@ public PhysicalPlan apply(final PhysicalPlan originalPlan, final Map<String, Lis .map(list -> list.stream() .map(pair -> pair.left()) .max(Integer::compareTo) - .orElseThrow(() -> new DynamicOptimizationException("Cannot find max hash value in a block."))) + .<DynamicOptimizationException>orElseThrow( + () -> new DynamicOptimizationException("Cannot find max hash value in a block."))) .max(Integer::compareTo) .orElseThrow(() -> new DynamicOptimizationException("Cannot find max hash value among blocks.")); diff --git a/runtime/master/pom.xml b/runtime/master/pom.xml index a797767e..c0a69b2b 100644 --- a/runtime/master/pom.xml +++ b/runtime/master/pom.xml @@ -46,6 +46,11 @@ limitations under the License. <artifactId>nemo-runtime-common</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>edu.snu.nemo</groupId> + <artifactId>nemo-runtime-plangenerator</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java similarity index 66% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java index ab39a968..585cddaa 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/BatchSingleJobSchedulerTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/BatchSingleJobSchedulerTest.java @@ -13,24 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; +package edu.snu.nemo.runtime.master.scheduler; -import edu.snu.nemo.common.coder.Coder; import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper; -import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; -import edu.snu.nemo.common.ir.vertex.IRVertex; -import edu.snu.nemo.common.ir.vertex.OperatorVertex; import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.compiler.frontend.beam.transform.DoTransform; -import edu.snu.nemo.common.ir.vertex.transform.Transform; -import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty; -import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer; -import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents; import edu.snu.nemo.conf.JobConf; -import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; -import edu.snu.nemo.tests.runtime.RuntimeTestUtil; import edu.snu.nemo.runtime.common.comm.ControlMessage; import edu.snu.nemo.runtime.common.message.MessageSender; import edu.snu.nemo.runtime.common.plan.physical.*; @@ -43,9 +30,7 @@ import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; import edu.snu.nemo.runtime.master.resource.ResourceSpecification; import edu.snu.nemo.common.dag.DAG; -import edu.snu.nemo.common.dag.DAGBuilder; -import edu.snu.nemo.runtime.master.scheduler.*; -import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy; +import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; @@ -75,7 +60,6 @@ PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class}) public final class BatchSingleJobSchedulerTest { private static final Logger LOG = LoggerFactory.getLogger(BatchSingleJobSchedulerTest.class.getName()); - private DAGBuilder<IRVertex, IREdge> irDAGBuilder; private Scheduler scheduler; private SchedulingPolicy schedulingPolicy; private SchedulerRunner schedulerRunner; @@ -86,7 +70,6 @@ private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler; private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class); private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class); - private PhysicalPlanGenerator physicalPlanGenerator; private static final int EXECUTOR_CAPACITY = 20; @@ -98,7 +81,6 @@ public void setUp() throws Exception { final Injector injector = Tang.Factory.getTang().newInjector(); injector.bindVolatileParameter(JobConf.DAGDirectory.class, ""); - irDAGBuilder = initializeDAGBuilder(); executorRegistry = injector.getInstance(ExecutorRegistry.class); metricMessageHandler = mock(MetricMessageHandler.class); pendingTaskGroupCollection = new SingleJobTaskGroupCollection(); @@ -139,8 +121,6 @@ public void setUp() throws Exception { // Add storage nodes scheduler.onExecutorAdded(b1); scheduler.onExecutorAdded(b2); - - physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class); } /** @@ -149,9 +129,8 @@ public void setUp() throws Exception { */ @Test(timeout=10000) public void testPull() throws Exception { - final DAG<IRVertex, IREdge> pullIRDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), - new TestPolicy(), ""); - scheduleAndCheckJobTermination(pullIRDAG); + scheduleAndCheckJobTermination( + TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false)); } /** @@ -160,68 +139,20 @@ public void testPull() throws Exception { */ @Test(timeout=10000) public void testPush() throws Exception { - final DAG<IRVertex, IREdge> pushIRDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), - new TestPolicy(true), ""); - scheduleAndCheckJobTermination(pushIRDAG); - } - - private DAGBuilder<IRVertex, IREdge> initializeDAGBuilder() { - final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>(); - - final Transform t = new EmptyComponents.EmptyTransform("empty"); - final IRVertex v1 = new OperatorVertex(t); - v1.setProperty(ParallelismProperty.of(1)); - v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - dagBuilder.addVertex(v1); - - final IRVertex v2 = new OperatorVertex(t); - v2.setProperty(ParallelismProperty.of(2)); - v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - dagBuilder.addVertex(v2); - - final IRVertex v3 = new OperatorVertex(t); - v3.setProperty(ParallelismProperty.of(3)); - v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - dagBuilder.addVertex(v3); - - final IRVertex v4 = new OperatorVertex(t); - v4.setProperty(ParallelismProperty.of(4)); - v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - dagBuilder.addVertex(v4); - - final IRVertex v5 = new OperatorVertex(new DoTransform(null, null)); - v5.setProperty(ParallelismProperty.of(5)); - v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT)); - dagBuilder.addVertex(v5); - - final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER); - dagBuilder.connectVertices(e1); - - final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER); - dagBuilder.connectVertices(e2); - - final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER); - dagBuilder.connectVertices(e4); - - final IREdge e5 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v5, Coder.DUMMY_CODER); - dagBuilder.connectVertices(e5); - - return dagBuilder; + scheduleAndCheckJobTermination( + TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, true)); } - private void scheduleAndCheckJobTermination(final DAG<IRVertex, IREdge> irDAG) throws InjectionException { - final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator); - - final PhysicalPlan plan = new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()); + private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws InjectionException { final JobStateManager jobStateManager = new JobStateManager(plan, blockManagerMaster, metricMessageHandler, 1); scheduler.scheduleJob(plan, jobStateManager); // For each ScheduleGroup, test: // a) all stages in the ScheduleGroup enters the executing state // b) the stages of the next ScheduleGroup are scheduled after the stages of each ScheduleGroup are made "complete". - for (int i = 0; i < getNumScheduleGroups(irDAG); i++) { + for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) { final int scheduleGroupIdx = i; - final List<PhysicalStage> stages = filterStagesWithAScheduleGroupIndex(physicalDAG, scheduleGroupIdx); + final List<PhysicalStage> stages = filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx); LOG.debug("Checking that all stages of ScheduleGroup {} enter the executing state", scheduleGroupIdx); stages.forEach(physicalStage -> { @@ -232,7 +163,7 @@ private void scheduleAndCheckJobTermination(final DAG<IRVertex, IREdge> irDAG) t }); stages.forEach(physicalStage -> { - RuntimeTestUtil.completeStage( + SchedulerTestUtil.completeStage( jobStateManager, scheduler, executorRegistry, physicalStage, MAGIC_SCHEDULE_ATTEMPT_INDEX); }); } @@ -258,10 +189,9 @@ private void scheduleAndCheckJobTermination(final DAG<IRVertex, IREdge> irDAG) t return sortedStages; } - private int getNumScheduleGroups(final DAG<IRVertex, IREdge> irDAG) { + private int getNumScheduleGroups(final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG) { final Set<Integer> scheduleGroupSet = new HashSet<>(); - irDAG.getVertices().forEach(irVertex -> - scheduleGroupSet.add((Integer) irVertex.getProperty(ExecutionProperty.Key.ScheduleGroupIndex))); + physicalDAG.getVertices().forEach(stage -> scheduleGroupSet.add(stage.getScheduleGroupIndex())); return scheduleGroupSet.size(); } } diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java similarity index 98% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java index 1f9462a8..b95c2a83 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; +package edu.snu.nemo.runtime.master.scheduler; import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java similarity index 72% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java index f6419247..bd171244 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FaultToleranceTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FaultToleranceTest.java @@ -13,24 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; +package edu.snu.nemo.runtime.master.scheduler; -import edu.snu.nemo.common.coder.Coder; -import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper; import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; -import edu.snu.nemo.common.ir.vertex.OperatorVertex; import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.common.ir.vertex.transform.Transform; -import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer; -import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents; -import edu.snu.nemo.conf.JobConf; -import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; -import edu.snu.nemo.tests.runtime.RuntimeTestUtil; import edu.snu.nemo.runtime.common.comm.ControlMessage; import edu.snu.nemo.runtime.common.message.MessageSender; import edu.snu.nemo.runtime.common.plan.physical.*; @@ -41,8 +30,7 @@ import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; import edu.snu.nemo.runtime.master.resource.ResourceSpecification; -import edu.snu.nemo.runtime.master.scheduler.*; -import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy; +import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator; import org.apache.reef.driver.context.ActiveContext; import org.apache.reef.tang.Injector; import org.apache.reef.tang.Tang; @@ -89,7 +77,6 @@ private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class); private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class); private final ExecutorService serExecutorService = Executors.newSingleThreadExecutor(); - private PhysicalPlanGenerator physicalPlanGenerator; private static final int MAX_SCHEDULE_ATTEMPT = 3; @@ -101,9 +88,6 @@ public void setUp() throws Exception { pubSubEventHandler = mock(PubSubEventHandlerWrapper.class); updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class); - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(JobConf.DAGDirectory.class, ""); - physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class); } private void setUpExecutors(final Collection<ExecutorRepresenter> executors, @@ -129,53 +113,6 @@ private void setUpExecutors(final Collection<ExecutorRepresenter> executors, } } - private PhysicalPlan buildPlan() throws Exception { - // Build DAG - final Transform t = new EmptyComponents.EmptyTransform("empty"); - final IRVertex v1 = new OperatorVertex(t); - v1.setProperty(ParallelismProperty.of(3)); - v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v1); - - final IRVertex v2 = new OperatorVertex(t); - v2.setProperty(ParallelismProperty.of(2)); - v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v2); - - final IRVertex v3 = new OperatorVertex(t); - v3.setProperty(ParallelismProperty.of(3)); - v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v3); - - final IRVertex v4 = new OperatorVertex(t); - v4.setProperty(ParallelismProperty.of(2)); - v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v4); - - final IRVertex v5 = new OperatorVertex(t); - v5.setProperty(ParallelismProperty.of(2)); - v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v5); - - final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e1); - - final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e2); - - final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e3); - - final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e4); - - final DAG<IRVertex, IREdge> irDAG = - CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), new TestPolicy(), ""); - final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator); - - return new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap()); - } - /** * Tests fault tolerance after a container removal. */ @@ -197,7 +134,9 @@ public void testContainerRemoval() throws Exception { executors.add(a3); setUpExecutors(executors, true); - final PhysicalPlan plan = buildPlan(); + final PhysicalPlan plan = + TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false); + final JobStateManager jobStateManager = new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); scheduler.scheduleJob(plan, jobStateManager); @@ -208,16 +147,16 @@ public void testContainerRemoval() throws Exception { if (stage.getScheduleGroupIndex() == 0) { // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, executorRegistry, false); assertTrue(pendingTaskGroupCollection.isEmpty()); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, - taskGroupId, TaskGroupState.State.COMPLETE, 1)); + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + taskGroupId, TaskGroupState.State.COMPLETE, 1)); } else if (stage.getScheduleGroupIndex() == 1) { scheduler.onExecutorRemoved("a3"); // There are 2 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, executorRegistry, false); // Due to round robin scheduling, "a2" is assured to have a running TaskGroup. @@ -228,16 +167,16 @@ public void testContainerRemoval() throws Exception { } assertEquals(jobStateManager.getAttemptCountForStage(stage.getId()), 2); - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, executorRegistry, false); assertTrue(pendingTaskGroupCollection.isEmpty()); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, - taskGroupId, TaskGroupState.State.COMPLETE, 1)); + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + taskGroupId, TaskGroupState.State.COMPLETE, 1)); } else { // There are 1 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 2. // Schedule only the first TaskGroup - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, executorRegistry, true); } } @@ -264,7 +203,8 @@ public void testOutputFailure() throws Exception { executors.add(a3); setUpExecutors(executors, true); - final PhysicalPlan plan = buildPlan(); + final PhysicalPlan plan = + TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false); final JobStateManager jobStateManager = new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); scheduler.scheduleJob(plan, jobStateManager); @@ -275,21 +215,21 @@ public void testOutputFailure() throws Exception { if (stage.getScheduleGroupIndex() == 0) { // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, executorRegistry, false); assertTrue(pendingTaskGroupCollection.isEmpty()); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, - taskGroupId, TaskGroupState.State.COMPLETE, 1)); + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + taskGroupId, TaskGroupState.State.COMPLETE, 1)); } else if (stage.getScheduleGroupIndex() == 1) { // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, executorRegistry, false); assertTrue(pendingTaskGroupCollection.isEmpty()); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, - taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1, - TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE)); + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1, + TaskGroupState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE)); while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState() != EXECUTING) { @@ -326,7 +266,8 @@ public void testInputReadFailure() throws Exception { executors.add(a3); setUpExecutors(executors, true); - final PhysicalPlan plan = buildPlan(); + final PhysicalPlan plan = + TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false); final JobStateManager jobStateManager = new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); scheduler.scheduleJob(plan, jobStateManager); @@ -337,21 +278,21 @@ public void testInputReadFailure() throws Exception { if (stage.getScheduleGroupIndex() == 0) { // There are 3 executors, each of capacity 2, and there are 6 TaskGroups in ScheduleGroup 0. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, executorRegistry, false); assertTrue(pendingTaskGroupCollection.isEmpty()); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, - taskGroupId, TaskGroupState.State.COMPLETE, 1)); + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + taskGroupId, TaskGroupState.State.COMPLETE, 1)); } else if (stage.getScheduleGroupIndex() == 1) { // There are 3 executors, each of capacity 2, and there are 2 TaskGroups in ScheduleGroup 1. - RuntimeTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, + SchedulerTestUtil.mockSchedulerRunner(pendingTaskGroupCollection, schedulingPolicy, jobStateManager, executorRegistry, false); stage.getTaskGroupIds().forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, - taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1, - TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE)); + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + taskGroupId, TaskGroupState.State.FAILED_RECOVERABLE, 1, + TaskGroupState.RecoverableFailureCause.INPUT_READ_FAILURE)); while (jobStateManager.getStageState(stage.getId()).getStateMachine().getCurrentState() != EXECUTING) { @@ -371,7 +312,7 @@ public void testInputReadFailure() throws Exception { */ @Test(timeout=10000) public void testTaskGroupReexecutionForFailure() throws Exception { - final ActiveContext activeContext = mock(ActiveContext.class); + final ActiveContext activeContext = mock(ActiveContext.class); Mockito.doThrow(new RuntimeException()).when(activeContext).close(); final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0); @@ -388,7 +329,10 @@ public void testTaskGroupReexecutionForFailure() throws Exception { setUpExecutors(executors, false); - final PhysicalPlan plan = buildPlan(); + final PhysicalPlan plan = + TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false); + + final JobStateManager jobStateManager = new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); @@ -403,11 +347,11 @@ public void testTaskGroupReexecutionForFailure() throws Exception { final Set<String> a3RunningTaskGroups = new HashSet<>(a3.getRunningTaskGroups()); a1RunningTaskGroups.forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, TaskGroupState.State.COMPLETE, 1)); a3RunningTaskGroups.forEach(taskGroupId -> - RuntimeTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, + SchedulerTestUtil.sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, TaskGroupState.State.COMPLETE, 1)); } } diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java similarity index 98% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java index 5308ab6a..162a1402 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/FreeSlotSchedulingPolicyTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; +package edu.snu.nemo.runtime.master.scheduler; import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java similarity index 93% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java index 6061fc06..04d30759 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/RoundRobinSchedulingPolicyTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/RoundRobinSchedulingPolicyTest.java @@ -13,12 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; +package edu.snu.nemo.runtime.master.scheduler; import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; -import edu.snu.nemo.runtime.master.scheduler.RoundRobinSchedulingPolicy; -import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; diff --git a/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java new file mode 100644 index 00000000..e3b1abba --- /dev/null +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SchedulerTestUtil.java @@ -0,0 +1,148 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.scheduler; + +import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage; +import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; +import edu.snu.nemo.runtime.common.state.StageState; +import edu.snu.nemo.runtime.common.state.TaskGroupState; +import edu.snu.nemo.runtime.master.JobStateManager; +import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; + +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Utility class for runtime unit tests. + */ +public final class SchedulerTestUtil { + /** + * Complete the stage by completing all of its TaskGroups. + * @param jobStateManager for the submitted job. + * @param scheduler for the submitted job. + * @param executorRegistry provides executor representers + * @param physicalStage for which the states should be marked as complete. + */ + public static void completeStage(final JobStateManager jobStateManager, + final Scheduler scheduler, + final ExecutorRegistry executorRegistry, + final PhysicalStage physicalStage, + final int attemptIdx) { + // Loop until the stage completes. + while (true) { + final Enum stageState = jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState(); + if (StageState.State.COMPLETE == stageState) { + // Stage has completed, so we break out of the loop. + break; + } else if (StageState.State.EXECUTING == stageState) { + physicalStage.getTaskGroupIds().forEach(taskGroupId -> { + final Enum tgState = jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState(); + if (TaskGroupState.State.EXECUTING == tgState) { + sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, + TaskGroupState.State.COMPLETE, attemptIdx, null); + } else if (TaskGroupState.State.READY == tgState || TaskGroupState.State.COMPLETE == tgState) { + // Skip READY (try in the next loop and see if it becomes EXECUTING) and COMPLETE. + } else { + throw new IllegalStateException(tgState.toString()); + } + }); + } else if (StageState.State.READY == stageState) { + // Skip and retry in the next loop. + } else { + throw new IllegalStateException(stageState.toString()); + } + } + } + + /** + * Sends task group state change event to scheduler. + * This replaces executor's task group completion messages for testing purposes. + * @param scheduler for the submitted job. + * @param executorRegistry provides executor representers + * @param taskGroupId for the task group to change the state. + * @param newState for the task group. + * @param cause in the case of a recoverable failure. + */ + public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler, + final ExecutorRegistry executorRegistry, + final String taskGroupId, + final TaskGroupState.State newState, + final int attemptIdx, + final TaskGroupState.RecoverableFailureCause cause) { + ExecutorRepresenter scheduledExecutor; + do { + scheduledExecutor = findExecutorForTaskGroup(executorRegistry, taskGroupId); + } while (scheduledExecutor == null); + + scheduler.onTaskGroupStateChanged(scheduledExecutor.getExecutorId(), taskGroupId, + newState, attemptIdx, null, cause); + } + + public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler, + final ExecutorRegistry executorRegistry, + final String taskGroupId, + final TaskGroupState.State newState, + final int attemptIdx) { + sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, newState, attemptIdx, null); + } + + public static void mockSchedulerRunner(final PendingTaskGroupCollection pendingTaskGroupCollection, + final SchedulingPolicy schedulingPolicy, + final JobStateManager jobStateManager, + final ExecutorRegistry executorRegistry, + final boolean isPartialSchedule) { + while (!pendingTaskGroupCollection.isEmpty()) { + final ScheduledTaskGroup taskGroupToSchedule = pendingTaskGroupCollection.remove( + pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId()); + + final Set<ExecutorRepresenter> runningExecutorRepresenter = + executorRegistry.getRunningExecutorIds().stream() + .map(executorId -> executorRegistry.getExecutorRepresenter(executorId)) + .collect(Collectors.toSet()); + final Set<ExecutorRepresenter> candidateExecutors = + schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, taskGroupToSchedule); + if (candidateExecutors.size() > 0) { + jobStateManager.onTaskGroupStateChanged(taskGroupToSchedule.getTaskGroupId(), + TaskGroupState.State.EXECUTING); + final ExecutorRepresenter executor = candidateExecutors.stream().findFirst().get(); + executor.onTaskGroupScheduled(taskGroupToSchedule); + } + + // Schedule only the first task group. + if (isPartialSchedule) { + break; + } + } + } + + /** + * Retrieves the executor to which the given task group was scheduled. + * @param taskGroupId of the task group to search. + * @param executorRegistry provides executor representers + * @return the {@link ExecutorRepresenter} of the executor the task group was scheduled to. + */ + private static ExecutorRepresenter findExecutorForTaskGroup(final ExecutorRegistry executorRegistry, + final String taskGroupId) { + for (final String executorId : executorRegistry.getRunningExecutorIds()) { + final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId); + if (executor.getRunningTaskGroups().contains(taskGroupId) + || executor.getCompleteTaskGroups().contains(taskGroupId)) { + return executor; + } + } + return null; + } +} diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java similarity index 59% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java index 41bdb4dc..2e86dbbf 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SingleTaskGroupQueueTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SingleTaskGroupQueueTest.java @@ -13,26 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; - -import edu.snu.nemo.common.coder.Coder; -import edu.snu.nemo.common.dag.DAG; -import edu.snu.nemo.common.dag.DAGBuilder; -import edu.snu.nemo.common.ir.vertex.transform.Transform; -import edu.snu.nemo.common.ir.edge.IREdge; -import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; -import edu.snu.nemo.common.ir.vertex.IRVertex; -import edu.snu.nemo.common.ir.vertex.OperatorVertex; -import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer; -import edu.snu.nemo.conf.JobConf; +package edu.snu.nemo.runtime.master.scheduler; + import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import edu.snu.nemo.runtime.common.plan.physical.*; -import edu.snu.nemo.runtime.master.scheduler.SingleJobTaskGroupCollection; -import edu.snu.nemo.tests.compiler.optimizer.policy.TestPolicy; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; +import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator; import org.junit.Before; import org.junit.Test; @@ -46,15 +31,12 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; /** * Tests {@link SingleJobTaskGroupCollection}. */ public final class SingleTaskGroupQueueTest { - private DAGBuilder<IRVertex, IREdge> irDAGBuilder; private SingleJobTaskGroupCollection pendingTaskGroupPriorityQueue; - private PhysicalPlanGenerator physicalPlanGenerator; /** * To be used for a thread pool to execute task groups. @@ -63,13 +45,8 @@ @Before public void setUp() throws Exception{ - irDAGBuilder = new DAGBuilder<>(); pendingTaskGroupPriorityQueue = new SingleJobTaskGroupCollection(); executorService = Executors.newFixedThreadPool(2); - - final Injector injector = Tang.Factory.getTang().newInjector(); - injector.bindVolatileParameter(JobConf.DAGDirectory.class, ""); - physicalPlanGenerator = injector.getInstance(PhysicalPlanGenerator.class); } /** @@ -78,37 +55,11 @@ public void setUp() throws Exception{ */ @Test public void testPushPriority() throws Exception { - final Transform t = mock(Transform.class); - final IRVertex v1 = new OperatorVertex(t); - v1.setProperty(ParallelismProperty.of(3)); - v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v1); - - final IRVertex v2 = new OperatorVertex(t); - v2.setProperty(ParallelismProperty.of(2)); - v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v2); - - final IRVertex v3 = new OperatorVertex(t); - v3.setProperty(ParallelismProperty.of(2)); - v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v3); - - final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e1); - - final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e2); - - final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), - new TestPolicy(true), ""); + final PhysicalPlan physicalPlan = + TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, true); - final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator); - - pendingTaskGroupPriorityQueue.onJobScheduled( - new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap())); - - final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort(); + pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan); + final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort(); // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements. assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex()); @@ -161,37 +112,10 @@ public void testPushPriority() throws Exception { */ @Test public void testPullPriority() throws Exception { - final Transform t = mock(Transform.class); - final IRVertex v1 = new OperatorVertex(t); - v1.setProperty(ParallelismProperty.of(3)); - v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v1); - - final IRVertex v2 = new OperatorVertex(t); - v2.setProperty(ParallelismProperty.of(2)); - v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v2); - - final IRVertex v3 = new OperatorVertex(t); - v3.setProperty(ParallelismProperty.of(2)); - v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v3); - - final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e1); - - final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e2); - - final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), - new TestPolicy(), ""); - - final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator); - - pendingTaskGroupPriorityQueue.onJobScheduled( - new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap())); - - final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort(); + final PhysicalPlan physicalPlan = + TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.ThreeSequentialVertices, false); + pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan); + final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort(); // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements. assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), 0); @@ -241,37 +165,10 @@ public void testPullPriority() throws Exception { */ @Test public void testWithDifferentContainerType() throws Exception { - final Transform t = mock(Transform.class); - final IRVertex v1 = new OperatorVertex(t); - v1.setProperty(ParallelismProperty.of(3)); - v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); - irDAGBuilder.addVertex(v1); - - final IRVertex v2 = new OperatorVertex(t); - v2.setProperty(ParallelismProperty.of(2)); - v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT)); - irDAGBuilder.addVertex(v2); - - final IRVertex v3 = new OperatorVertex(t); - v3.setProperty(ParallelismProperty.of(2)); - v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT)); - irDAGBuilder.addVertex(v3); - - final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e1); - - final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, Coder.DUMMY_CODER); - irDAGBuilder.connectVertices(e2); - - final DAG<IRVertex, IREdge> irDAG = CompiletimeOptimizer.optimize(irDAGBuilder.buildWithoutSourceSinkCheck(), - new TestPolicy(true), ""); - - final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = irDAG.convert(physicalPlanGenerator); - - pendingTaskGroupPriorityQueue.onJobScheduled( - new PhysicalPlan("TestPlan", physicalDAG, physicalPlanGenerator.getTaskIRVertexMap())); - - final List<PhysicalStage> dagOf2Stages = physicalDAG.getTopologicalSort(); + final PhysicalPlan physicalPlan = TestPlanGenerator.generatePhysicalPlan( + TestPlanGenerator.PlanType.ThreeSequentialVerticesWithDifferentContainerTypes, true); + pendingTaskGroupPriorityQueue.onJobScheduled(physicalPlan); + final List<PhysicalStage> dagOf2Stages = physicalPlan.getStageDAG().getTopologicalSort(); // Make sure that ScheduleGroups have been assigned to satisfy PendingPQ's requirements. assertEquals(dagOf2Stages.get(0).getScheduleGroupIndex(), dagOf2Stages.get(1).getScheduleGroupIndex()); diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java similarity index 97% rename from tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java index 810bb313..6d5fc43a 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java +++ b/runtime/master/src/test/java/edu.snu.nemo.runtime.master/scheduler/SourceLocationAwareSchedulingPolicyTest.java @@ -13,13 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package edu.snu.nemo.tests.runtime.master.scheduler; +package edu.snu.nemo.runtime.master.scheduler; import edu.snu.nemo.runtime.common.plan.physical.ScheduledTaskGroup; import edu.snu.nemo.common.ir.Readable; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; -import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy; -import edu.snu.nemo.runtime.master.scheduler.SourceLocationAwareSchedulingPolicy; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; diff --git a/runtime/plangenerator/pom.xml b/runtime/plangenerator/pom.xml new file mode 100644 index 00000000..26ced4fc --- /dev/null +++ b/runtime/plangenerator/pom.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright (C) 2017 Seoul National University +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>edu.snu.nemo</groupId> + <artifactId>nemo-project</artifactId> + <version>0.1-SNAPSHOT</version> + <relativePath>../../</relativePath> + </parent> + + <artifactId>nemo-runtime-plangenerator</artifactId> + <name>Nemo Runtime Test Plans</name> + + <repositories> + <repository> + <id>Bundled Maven Repository</id> + <url>file://${basedir}/../../common/src/main/resources/repository</url> + </repository> + </repositories> + + <dependencies> + <dependency> + <groupId>edu.snu.nemo</groupId> + <artifactId>nemo-runtime-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>edu.snu.nemo</groupId> + <artifactId>nemo-compiler-optimizer</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> +</project> diff --git a/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java new file mode 100644 index 00000000..d856dd05 --- /dev/null +++ b/runtime/plangenerator/src/main/java/edu/snu/nemo/runtime/plangenerator/TestPlanGenerator.java @@ -0,0 +1,194 @@ +/* + * Copyright (C) 2017 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.plangenerator; + +import edu.snu.nemo.common.coder.Coder; +import edu.snu.nemo.common.dag.DAG; +import edu.snu.nemo.common.dag.DAGBuilder; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; +import edu.snu.nemo.common.ir.vertex.IRVertex; +import edu.snu.nemo.common.ir.vertex.OperatorVertex; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; +import edu.snu.nemo.common.ir.vertex.transform.Transform; +import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer; +import edu.snu.nemo.compiler.optimizer.examples.EmptyComponents; +import edu.snu.nemo.compiler.optimizer.policy.BasicPullPolicy; +import edu.snu.nemo.compiler.optimizer.policy.BasicPushPolicy; +import edu.snu.nemo.compiler.optimizer.policy.Policy; +import edu.snu.nemo.conf.JobConf; +import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlan; +import edu.snu.nemo.runtime.common.plan.physical.PhysicalPlanGenerator; +import edu.snu.nemo.runtime.common.plan.physical.PhysicalStage; +import edu.snu.nemo.runtime.common.plan.physical.PhysicalStageEdge; +import org.apache.reef.tang.Injector; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; + +/** + * Generates physical plans for testing purposes. + */ +public final class TestPlanGenerator { + private static final PhysicalPlanGenerator PLAN_GENERATOR; + private static final String EMPTY_DAG_DIRECTORY = ""; + + static { + final Injector injector = Tang.Factory.getTang().newInjector(); + injector.bindVolatileParameter(JobConf.DAGDirectory.class, EMPTY_DAG_DIRECTORY); + try { + PLAN_GENERATOR = injector.getInstance(PhysicalPlanGenerator.class); + } catch (InjectionException e) { + throw new RuntimeException(e); + } + } + + /** + * Type of the plan to generate. + */ + public enum PlanType { + TwoVerticesJoined, + ThreeSequentialVertices, + ThreeSequentialVerticesWithDifferentContainerTypes + } + + /** + * private constructor. + */ + private TestPlanGenerator() { + } + + /** + * @param planType type of the plan to generate. + * @param isPush whether to use the push policy. + * @return the generated plan. + * @throws Exception exception. + */ + public static PhysicalPlan generatePhysicalPlan(final PlanType planType, final boolean isPush) throws Exception { + final Policy policy = isPush ? new BasicPushPolicy() : new BasicPullPolicy(); + switch (planType) { + case TwoVerticesJoined: + return convertIRToPhysical(getTwoVerticesJoinedDAG(), policy); + case ThreeSequentialVertices: + return convertIRToPhysical(getThreeSequentialVerticesDAG(true), policy); + case ThreeSequentialVerticesWithDifferentContainerTypes: + return convertIRToPhysical(getThreeSequentialVerticesDAG(false), policy); + default: + throw new IllegalArgumentException(planType.toString()); + } + } + + /** + * @param irDAG irDAG. + * @param policy policy. + * @return convert an IR into a physical plan using the given policy. + * @throws Exception exception. + */ + private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> irDAG, + final Policy policy) throws Exception { + final DAG<IRVertex, IREdge> optimized = CompiletimeOptimizer.optimize(irDAG, policy, EMPTY_DAG_DIRECTORY); + final DAG<PhysicalStage, PhysicalStageEdge> physicalDAG = optimized.convert(PLAN_GENERATOR); + return new PhysicalPlan("Plan", physicalDAG, PLAN_GENERATOR.getTaskIRVertexMap()); + } + + /** + * @return a dag that joins two vertices. + */ + private static DAG<IRVertex, IREdge> getTwoVerticesJoinedDAG() { + final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>(); + + final Transform t = new EmptyComponents.EmptyTransform("empty"); + final IRVertex v1 = new OperatorVertex(t); + v1.setProperty(ParallelismProperty.of(3)); + v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + dagBuilder.addVertex(v1); + + final IRVertex v2 = new OperatorVertex(t); + v2.setProperty(ParallelismProperty.of(2)); + v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + dagBuilder.addVertex(v2); + + final IRVertex v3 = new OperatorVertex(t); + v3.setProperty(ParallelismProperty.of(3)); + v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + dagBuilder.addVertex(v3); + + final IRVertex v4 = new OperatorVertex(t); + v4.setProperty(ParallelismProperty.of(2)); + v4.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + dagBuilder.addVertex(v4); + + final IRVertex v5 = new OperatorVertex(t); + v5.setProperty(ParallelismProperty.of(2)); + v5.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + dagBuilder.addVertex(v5); + + final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER); + dagBuilder.connectVertices(e1); + + final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v3, v2, Coder.DUMMY_CODER); + dagBuilder.connectVertices(e2); + + final IREdge e3 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v2, v4, Coder.DUMMY_CODER); + dagBuilder.connectVertices(e3); + + final IREdge e4 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v4, v5, Coder.DUMMY_CODER); + dagBuilder.connectVertices(e4); + + return dagBuilder.buildWithoutSourceSinkCheck(); + } + + /** + * @param sameContainerType whether all three vertices are of the same container type + * @return a dag with 3 sequential vertices. + */ + private static DAG<IRVertex, IREdge> getThreeSequentialVerticesDAG(final boolean sameContainerType) { + final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>(); + + final Transform t = new EmptyComponents.EmptyTransform("empty"); + final IRVertex v1 = new OperatorVertex(t); + v1.setProperty(ParallelismProperty.of(3)); + v1.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + dagBuilder.addVertex(v1); + + final IRVertex v2 = new OperatorVertex(t); + v2.setProperty(ParallelismProperty.of(2)); + if (sameContainerType) { + v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + } else { + v2.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT)); + } + dagBuilder.addVertex(v2); + + final IRVertex v3 = new OperatorVertex(t); + v3.setProperty(ParallelismProperty.of(2)); + if (sameContainerType) { + v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.COMPUTE)); + } else { + v3.setProperty(ExecutorPlacementProperty.of(ExecutorPlacementProperty.TRANSIENT)); + } + dagBuilder.addVertex(v3); + + final IREdge e1 = new IREdge(DataCommunicationPatternProperty.Value.Shuffle, v1, v2, Coder.DUMMY_CODER); + dagBuilder.connectVertices(e1); + + final IREdge e2 = new IREdge(DataCommunicationPatternProperty.Value.OneToOne, v2, v3, Coder.DUMMY_CODER); + dagBuilder.connectVertices(e2); + + return dagBuilder.buildWithoutSourceSinkCheck(); + } +} + diff --git a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java index 49d5d4dc..fbe00d27 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java +++ b/tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java @@ -15,15 +15,6 @@ */ package edu.snu.nemo.tests.runtime; -import edu.snu.nemo.runtime.common.plan.physical.*; -import edu.snu.nemo.runtime.common.state.StageState; -import edu.snu.nemo.runtime.common.state.TaskGroupState; -import edu.snu.nemo.runtime.master.JobStateManager; -import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry; -import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; -import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupCollection; -import edu.snu.nemo.runtime.master.scheduler.Scheduler; -import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy; import org.apache.beam.sdk.values.KV; import java.util.*; @@ -34,123 +25,6 @@ * Utility class for runtime unit tests. */ public final class RuntimeTestUtil { - /** - * Complete the stage by completing all of its TaskGroups. - * @param jobStateManager for the submitted job. - * @param scheduler for the submitted job. - * @param executorRegistry provides executor representers - * @param physicalStage for which the states should be marked as complete. - */ - public static void completeStage(final JobStateManager jobStateManager, - final Scheduler scheduler, - final ExecutorRegistry executorRegistry, - final PhysicalStage physicalStage, - final int attemptIdx) { - // Loop until the stage completes. - while (true) { - final Enum stageState = jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState(); - if (StageState.State.COMPLETE == stageState) { - // Stage has completed, so we break out of the loop. - break; - } else if (StageState.State.EXECUTING == stageState) { - physicalStage.getTaskGroupIds().forEach(taskGroupId -> { - final Enum tgState = jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState(); - if (TaskGroupState.State.EXECUTING == tgState) { - sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, - TaskGroupState.State.COMPLETE, attemptIdx, null); - } else if (TaskGroupState.State.READY == tgState || TaskGroupState.State.COMPLETE == tgState) { - // Skip READY (try in the next loop and see if it becomes EXECUTING) and COMPLETE. - } else { - throw new IllegalStateException(tgState.toString()); - } - }); - } else if (StageState.State.READY == stageState) { - // Skip and retry in the next loop. - } else { - throw new IllegalStateException(stageState.toString()); - } - } - } - - /** - * Sends task group state change event to scheduler. - * This replaces executor's task group completion messages for testing purposes. - * @param scheduler for the submitted job. - * @param executorRegistry provides executor representers - * @param taskGroupId for the task group to change the state. - * @param newState for the task group. - * @param cause in the case of a recoverable failure. - */ - public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler, - final ExecutorRegistry executorRegistry, - final String taskGroupId, - final TaskGroupState.State newState, - final int attemptIdx, - final TaskGroupState.RecoverableFailureCause cause) { - ExecutorRepresenter scheduledExecutor; - do { - scheduledExecutor = findExecutorForTaskGroup(executorRegistry, taskGroupId); - } while (scheduledExecutor == null); - - scheduler.onTaskGroupStateChanged(scheduledExecutor.getExecutorId(), taskGroupId, - newState, attemptIdx, null, cause); - } - - public static void sendTaskGroupStateEventToScheduler(final Scheduler scheduler, - final ExecutorRegistry executorRegistry, - final String taskGroupId, - final TaskGroupState.State newState, - final int attemptIdx) { - sendTaskGroupStateEventToScheduler(scheduler, executorRegistry, taskGroupId, newState, attemptIdx, null); - } - - public static void mockSchedulerRunner(final PendingTaskGroupCollection pendingTaskGroupCollection, - final SchedulingPolicy schedulingPolicy, - final JobStateManager jobStateManager, - final ExecutorRegistry executorRegistry, - final boolean isPartialSchedule) { - while (!pendingTaskGroupCollection.isEmpty()) { - final ScheduledTaskGroup taskGroupToSchedule = pendingTaskGroupCollection.remove( - pendingTaskGroupCollection.peekSchedulableTaskGroups().get().iterator().next().getTaskGroupId()); - - final Set<ExecutorRepresenter> runningExecutorRepresenter = - executorRegistry.getRunningExecutorIds().stream() - .map(executorId -> executorRegistry.getExecutorRepresenter(executorId)) - .collect(Collectors.toSet()); - final Set<ExecutorRepresenter> candidateExecutors = - schedulingPolicy.filterExecutorRepresenters(runningExecutorRepresenter, taskGroupToSchedule); - if (candidateExecutors.size() > 0) { - jobStateManager.onTaskGroupStateChanged(taskGroupToSchedule.getTaskGroupId(), - TaskGroupState.State.EXECUTING); - final ExecutorRepresenter executor = candidateExecutors.stream().findFirst().get(); - executor.onTaskGroupScheduled(taskGroupToSchedule); - } - - // Schedule only the first task group. - if (isPartialSchedule) { - break; - } - } - } - - /** - * Retrieves the executor to which the given task group was scheduled. - * @param taskGroupId of the task group to search. - * @param executorRegistry provides executor representers - * @return the {@link ExecutorRepresenter} of the executor the task group was scheduled to. - */ - private static ExecutorRepresenter findExecutorForTaskGroup(final ExecutorRegistry executorRegistry, - final String taskGroupId) { - for (final String executorId : executorRegistry.getRunningExecutorIds()) { - final ExecutorRepresenter executor = executorRegistry.getRunningExecutorRepresenter(executorId); - if (executor.getRunningTaskGroups().contains(taskGroupId) - || executor.getCompleteTaskGroups().contains(taskGroupId)) { - return executor; - } - } - return null; - } - /** * Gets a list of integer pair elements in range. * @param start value of the range (inclusive). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
