wonook closed pull request #57: [NEMO-73,75] SchedulingPolicy as Vertex-level Execution Property URL: https://github.com/apache/incubator-nemo/pull/57
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java new file mode 100644 index 00000000..ceebd861 --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/ir/executionproperty/AssociatedProperty.java @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.ir.executionproperty; + +import java.lang.annotation.*; + +/** + * Declares associated {@link ExecutionProperty} for implementations. + */ +@Target({ElementType.TYPE}) +@Documented +@Retention(RetentionPolicy.RUNTIME) +public @interface AssociatedProperty { + Class<? extends ExecutionProperty> value(); +} diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java new file mode 100644 index 00000000..357be21d --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ExecutorSlotComplianceProperty.java @@ -0,0 +1,46 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.ir.vertex.executionproperty; + +import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty; + +/** + * This property decides whether or not to comply to slot restrictions when scheduling this vertex. + */ +public final class ExecutorSlotComplianceProperty extends VertexExecutionProperty<Boolean> { + private static final ExecutorSlotComplianceProperty COMPLIANCE_TRUE = new ExecutorSlotComplianceProperty(true); + private static final ExecutorSlotComplianceProperty COMPLIANCE_FALSE + = new ExecutorSlotComplianceProperty(false); + + /** + * Default constructor. + * + * @param value value of the ExecutionProperty + */ + private ExecutorSlotComplianceProperty(final boolean value) { + super(value); + } + + /** + * Static method getting execution property. + * + * @param value value of the new execution property + * @return the execution property + */ + public static ExecutorSlotComplianceProperty of(final boolean value) { + return value ? COMPLIANCE_TRUE : COMPLIANCE_FALSE; + } +} diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java similarity index 76% rename from common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java rename to common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java index 08518ff9..3b521cbb 100644 --- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupIndexProperty.java +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/ScheduleGroupProperty.java @@ -18,14 +18,14 @@ import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty; /** - * ScheduleGroupIndex ExecutionProperty. + * ScheduleGroup ExecutionProperty. */ -public final class ScheduleGroupIndexProperty extends VertexExecutionProperty<Integer> { +public final class ScheduleGroupProperty extends VertexExecutionProperty<Integer> { /** * Constructor. * @param value value of the execution property. */ - private ScheduleGroupIndexProperty(final Integer value) { + private ScheduleGroupProperty(final Integer value) { super(value); } @@ -34,7 +34,7 @@ private ScheduleGroupIndexProperty(final Integer value) { * @param value value of the new execution property. * @return the newly created execution property. */ - public static ScheduleGroupIndexProperty of(final Integer value) { - return new ScheduleGroupIndexProperty(value); + public static ScheduleGroupProperty of(final Integer value) { + return new ScheduleGroupProperty(value); } } diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java new file mode 100644 index 00000000..cad987da --- /dev/null +++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/executionproperty/SourceLocationAwareSchedulingProperty.java @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.common.ir.vertex.executionproperty; + +import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty; + +/** + * This property decides whether or not to schedule this vertex only on executors where source data reside. + */ +public final class SourceLocationAwareSchedulingProperty extends VertexExecutionProperty<Boolean> { + private static final SourceLocationAwareSchedulingProperty SOURCE_TRUE + = new SourceLocationAwareSchedulingProperty(true); + private static final SourceLocationAwareSchedulingProperty SOURCE_FALSE + = new SourceLocationAwareSchedulingProperty(false); + + /** + * Default constructor. + * + * @param value value of the ExecutionProperty + */ + private SourceLocationAwareSchedulingProperty(final boolean value) { + super(value); + } + + /** + * Static method getting execution property. + * + * @param value value of the new execution property + * @return the execution property + */ + public static SourceLocationAwareSchedulingProperty of(final boolean value) { + return value ? SOURCE_TRUE : SOURCE_FALSE; + } +} diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java index 38f964cb..ca788bdf 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPass.java @@ -23,7 +23,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataCommunicationPatternProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty; import org.apache.commons.lang3.mutable.MutableInt; import java.util.*; @@ -71,7 +71,7 @@ public DefaultScheduleGroupPass() { public DefaultScheduleGroupPass(final boolean allowBroadcastWithinScheduleGroup, final boolean allowShuffleWithinScheduleGroup, final boolean allowMultipleInEdgesWithinScheduleGroup) { - super(ScheduleGroupIndexProperty.class, Stream.of( + super(ScheduleGroupProperty.class, Stream.of( DataCommunicationPatternProperty.class, DataFlowModelProperty.class ).collect(Collectors.toSet())); @@ -93,7 +93,7 @@ public DefaultScheduleGroupPass(final boolean allowBroadcastWithinScheduleGroup, newScheduleGroup.vertices.add(irVertex); irVertexToScheduleGroupMap.put(irVertex, newScheduleGroup); } - // Get scheduleGroupIndex + // Get scheduleGroup final ScheduleGroup scheduleGroup = irVertexToScheduleGroupMap.get(irVertex); if (scheduleGroup == null) { throw new RuntimeException(String.format("ScheduleGroup must be set for %s", irVertex)); @@ -203,42 +203,42 @@ public DefaultScheduleGroupPass(final boolean allowBroadcastWithinScheduleGroup, } }); - // Assign ScheduleGroupIndex property based on topology of ScheduleGroups - final MutableInt currentScheduleGroupIndex = new MutableInt(getNextScheudleGroupIndex(dag.getVertices())); + // Assign ScheduleGroup property based on topology of ScheduleGroups + final MutableInt currentScheduleGroup = new MutableInt(getNextScheudleGroup(dag.getVertices())); final DAGBuilder<ScheduleGroup, ScheduleGroupEdge> scheduleGroupDAGBuilder = new DAGBuilder<>(); scheduleGroups.forEach(scheduleGroupDAGBuilder::addVertex); scheduleGroups.forEach(src -> src.scheduleGroupsTo .forEach(dst -> scheduleGroupDAGBuilder.connectVertices(new ScheduleGroupEdge(src, dst)))); scheduleGroupDAGBuilder.build().topologicalDo(scheduleGroup -> { - boolean usedCurrentIndex = false; + boolean usedCurrentScheduleGroup = false; for (final IRVertex irVertex : scheduleGroup.vertices) { - if (!irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).isPresent()) { - irVertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(currentScheduleGroupIndex.getValue())); - usedCurrentIndex = true; + if (!irVertex.getPropertyValue(ScheduleGroupProperty.class).isPresent()) { + irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(currentScheduleGroup.getValue())); + usedCurrentScheduleGroup = true; } } - if (usedCurrentIndex) { - currentScheduleGroupIndex.increment(); + if (usedCurrentScheduleGroup) { + currentScheduleGroup.increment(); } }); return dag; } /** - * Determines the range of {@link ScheduleGroupIndexProperty} value that will prevent collision - * with the existing {@link ScheduleGroupIndexProperty}. + * Determines the range of {@link ScheduleGroupProperty} value that will prevent collision + * with the existing {@link ScheduleGroupProperty}. * @param irVertexCollection collection of {@link IRVertex} - * @return the minimum value for the {@link ScheduleGroupIndexProperty} that won't collide with the existing values + * @return the minimum value for the {@link ScheduleGroupProperty} that won't collide with the existing values */ - private int getNextScheudleGroupIndex(final Collection<IRVertex> irVertexCollection) { - int nextScheduleGroupIndex = 0; + private int getNextScheudleGroup(final Collection<IRVertex> irVertexCollection) { + int nextScheduleGroup = 0; for (final IRVertex irVertex : irVertexCollection) { - final Optional<Integer> scheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class); - if (scheduleGroupIndex.isPresent()) { - nextScheduleGroupIndex = Math.max(scheduleGroupIndex.get() + 1, nextScheduleGroupIndex); + final Optional<Integer> scheduleGroup = irVertex.getPropertyValue(ScheduleGroupProperty.class); + if (scheduleGroup.isPresent()) { + nextScheduleGroup = Math.max(scheduleGroup.get() + 1, nextScheduleGroup); } } - return nextScheduleGroupIndex; + return nextScheduleGroup; } /** diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java new file mode 100644 index 00000000..e1b2123d --- /dev/null +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/ExecutorSlotCompliancePass.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating; + +import edu.snu.nemo.common.dag.DAG; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.vertex.IRVertex; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty; + +/** + * Sets {@link ExecutorSlotComplianceProperty}. + */ +public final class ExecutorSlotCompliancePass extends AnnotatingPass { + + public ExecutorSlotCompliancePass() { + super(ExecutorSlotComplianceProperty.class); + } + + @Override + public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) { + // On every vertex, if ExecutorSlotComplianceProperty is not set, put it as true. + dag.getVertices().stream() + .filter(v -> !v.getExecutionProperties().containsKey(ExecutorSlotComplianceProperty.class)) + .forEach(v -> v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(true))); + return dag; + } +} diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java new file mode 100644 index 00000000..8c30d95d --- /dev/null +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SourceLocationAwareSchedulingPass.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating; + +import edu.snu.nemo.common.dag.DAG; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.vertex.IRVertex; +import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty; + +/** + * Sets {@link SourceLocationAwareSchedulingProperty}. + */ +public final class SourceLocationAwareSchedulingPass extends AnnotatingPass { + + public SourceLocationAwareSchedulingPass() { + super(SourceLocationAwareSchedulingProperty.class); + } + + @Override + public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) { + // On every vertex, if SourceLocationAwareSchedulingProperty is not set, put it as true. + dag.getVertices().stream() + .filter(v -> !v.getExecutionProperties().containsKey(SourceLocationAwareSchedulingProperty.class)) + .forEach(v -> v.getExecutionProperties().put(SourceLocationAwareSchedulingProperty.of(true))); + return dag; + } +} diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java index 3a5c80c9..a30abec1 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/PrimitiveCompositePass.java @@ -37,7 +37,9 @@ public PrimitiveCompositePass() { new DefaultEdgeUsedDataHandlingPass(), new DefaultScheduleGroupPass(), new CompressionPass(), - new DecompressionPass() + new DecompressionPass(), + new SourceLocationAwareSchedulingPass(), + new ExecutorSlotCompliancePass() )); } } diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java index 107a6231..b9b97959 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java @@ -24,7 +24,7 @@ import edu.snu.nemo.common.ir.vertex.*; import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty; import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty; import edu.snu.nemo.conf.JobConf; import edu.snu.nemo.common.dag.DAG; import edu.snu.nemo.common.dag.DAGBuilder; @@ -226,8 +226,8 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS private void integrityCheck(final Stage stage) { stage.getPropertyValue(ParallelismProperty.class) .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage")); - stage.getPropertyValue(ScheduleGroupIndexProperty.class) - .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage")); + stage.getPropertyValue(ScheduleGroupProperty.class) + .orElseThrow(() -> new RuntimeException("ScheduleGroup property must be set for Stage")); stage.getIRDAG().getVertices().forEach(irVertex -> { // Check vertex type. @@ -241,24 +241,24 @@ private void integrityCheck(final Stage stage) { /** * Split ScheduleGroups by Pull {@link StageEdge}s, and ensure topological ordering of - * {@link ScheduleGroupIndexProperty}. + * {@link ScheduleGroupProperty}. * * @param dag {@link DAG} of {@link Stage}s to manipulate */ private void splitScheduleGroupByPullStageEdges(final DAG<Stage, StageEdge> dag) { - final MutableInt nextScheduleGroupIndex = new MutableInt(0); - final Map<Stage, Integer> stageToScheduleGroupIndexMap = new HashMap<>(); + final MutableInt nextScheduleGroup = new MutableInt(0); + final Map<Stage, Integer> stageToScheduleGroupMap = new HashMap<>(); dag.topologicalDo(currentStage -> { - // Base case: assign New ScheduleGroupIndex of the Stage - stageToScheduleGroupIndexMap.computeIfAbsent(currentStage, s -> getAndIncrement(nextScheduleGroupIndex)); + // Base case: assign New ScheduleGroup of the Stage + stageToScheduleGroupMap.computeIfAbsent(currentStage, s -> getAndIncrement(nextScheduleGroup)); for (final StageEdge stageEdgeFromCurrentStage : dag.getOutgoingEdgesOf(currentStage)) { final Stage destination = stageEdgeFromCurrentStage.getDst(); - // Skip if some Stages that destination depends on do not have assigned new ScheduleGroupIndex + // Skip if some Stages that destination depends on do not have assigned new ScheduleGroup boolean skip = false; for (final StageEdge stageEdgeToDestination : dag.getIncomingEdgesOf(destination)) { - if (!stageToScheduleGroupIndexMap.containsKey(stageEdgeToDestination.getSrc())) { + if (!stageToScheduleGroupMap.containsKey(stageEdgeToDestination.getSrc())) { skip = true; break; } @@ -266,42 +266,42 @@ private void splitScheduleGroupByPullStageEdges(final DAG<Stage, StageEdge> dag) if (skip) { continue; } - if (stageToScheduleGroupIndexMap.containsKey(destination)) { + if (stageToScheduleGroupMap.containsKey(destination)) { continue; } // Find any non-pull inEdge - Integer scheduleGroupIndex = null; - Integer newScheduleGroupIndex = null; + Integer scheduleGroup = null; + Integer newScheduleGroup = null; for (final StageEdge stageEdge : dag.getIncomingEdgesOf(destination)) { final Stage source = stageEdge.getSrc(); if (stageEdge.getDataFlowModel() != DataFlowModelProperty.Value.Pull) { - if (scheduleGroupIndex != null && source.getScheduleGroupIndex() != scheduleGroupIndex) { + if (scheduleGroup != null && source.getScheduleGroup() != scheduleGroup) { throw new RuntimeException(String.format("Multiple Push inEdges from different ScheduleGroup: %d, %d", - scheduleGroupIndex, source.getScheduleGroupIndex())); + scheduleGroup, source.getScheduleGroup())); } - if (source.getScheduleGroupIndex() != destination.getScheduleGroupIndex()) { + if (source.getScheduleGroup() != destination.getScheduleGroup()) { throw new RuntimeException(String.format("Split ScheduleGroup by push StageEdge: %d, %d", - source.getScheduleGroupIndex(), destination.getScheduleGroupIndex())); + source.getScheduleGroup(), destination.getScheduleGroup())); } - scheduleGroupIndex = source.getScheduleGroupIndex(); - newScheduleGroupIndex = stageToScheduleGroupIndexMap.get(source); + scheduleGroup = source.getScheduleGroup(); + newScheduleGroup = stageToScheduleGroupMap.get(source); } } - if (newScheduleGroupIndex == null) { - stageToScheduleGroupIndexMap.put(destination, getAndIncrement(nextScheduleGroupIndex)); + if (newScheduleGroup == null) { + stageToScheduleGroupMap.put(destination, getAndIncrement(nextScheduleGroup)); } else { - stageToScheduleGroupIndexMap.put(destination, newScheduleGroupIndex); + stageToScheduleGroupMap.put(destination, newScheduleGroup); } } }); dag.topologicalDo(stage -> { - final int scheduleGroupIndex = stageToScheduleGroupIndexMap.get(stage); - stage.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex)); + final int scheduleGroup = stageToScheduleGroupMap.get(stage); + stage.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup)); stage.getIRDAG().topologicalDo(vertex -> vertex.getExecutionProperties() - .put(ScheduleGroupIndexProperty.of(scheduleGroupIndex))); + .put(ScheduleGroupProperty.of(scheduleGroup))); }); } diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java index 9e5da38b..c2abcc4e 100644 --- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java +++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Stage.java @@ -22,7 +22,7 @@ import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty; import edu.snu.nemo.runtime.common.RuntimeIdGenerator; import org.apache.commons.lang3.SerializationUtils; @@ -94,11 +94,11 @@ public int getParallelism() { } /** - * @return the schedule group index. + * @return the schedule group. */ - public int getScheduleGroupIndex() { - return executionProperties.get(ScheduleGroupIndexProperty.class) - .orElseThrow(() -> new RuntimeException("ScheduleGroupIndex property must be set for Stage")); + public int getScheduleGroup() { + return executionProperties.get(ScheduleGroupProperty.class) + .orElseThrow(() -> new RuntimeException("ScheduleGroup property must be set for Stage")); } /** @@ -130,7 +130,7 @@ public int getScheduleGroupIndex() { @Override public String propertiesToJSON() { final StringBuilder sb = new StringBuilder(); - sb.append("{\"scheduleGroupIndex\": ").append(getScheduleGroupIndex()); + sb.append("{\"scheduleGroup\": ").append(getScheduleGroup()); sb.append(", \"irDag\": ").append(irDag); sb.append(", \"parallelism\": ").append(getParallelism()); sb.append(", \"executionProperties\": ").append(executionProperties); diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java index 846bccab..c2919a32 100644 --- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java +++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java @@ -23,7 +23,7 @@ import edu.snu.nemo.common.ir.vertex.SourceVertex; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty; import edu.snu.nemo.common.test.EmptyComponents; import edu.snu.nemo.conf.JobConf; import edu.snu.nemo.common.Pair; @@ -131,9 +131,10 @@ public void setUp() throws InjectionException { final MetricMessageHandler metricMessageHandler = mock(MetricMessageHandler.class); final PubSubEventHandlerWrapper pubSubEventHandler = mock(PubSubEventHandlerWrapper.class); final UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class); - final SchedulingPolicy schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class); + final SchedulingConstraintRegistry schedulingConstraint = injector.getInstance(SchedulingConstraintRegistry.class); + final SchedulingPolicy schedulingPolicy = injector.getInstance(SchedulingPolicy.class); final PendingTaskCollectionPointer taskQueue = new PendingTaskCollectionPointer(); - final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingPolicy, taskQueue, executorRegistry); + final SchedulerRunner schedulerRunner = new SchedulerRunner(schedulingConstraint, schedulingPolicy, taskQueue, executorRegistry); final Scheduler scheduler = new BatchSingleJobScheduler( schedulerRunner, taskQueue, master, pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry); final AtomicInteger executorCount = new AtomicInteger(0); @@ -549,7 +550,7 @@ private Stage setupStages(final String stageId) { final ExecutionPropertyMap<VertexExecutionProperty> stageExecutionProperty = new ExecutionPropertyMap<>(stageId); stageExecutionProperty.put(ParallelismProperty.of(PARALLELISM_TEN)); - stageExecutionProperty.put(ScheduleGroupIndexProperty.of(0)); + stageExecutionProperty.put(ScheduleGroupProperty.of(0)); return new Stage(stageId, emptyDag, stageExecutionProperty, Collections.emptyList()); } } diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java index 4bd8d4fb..40094243 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobScheduler.java @@ -114,7 +114,7 @@ public void scheduleJob(final PhysicalPlan physicalPlanOfJob, final JobStateMana LOG.info("Job to schedule: {}", physicalPlanOfJob.getId()); this.initialScheduleGroup = physicalPlanOfJob.getStageDAG().getVertices().stream() - .mapToInt(stage -> stage.getScheduleGroupIndex()) + .mapToInt(stage -> stage.getScheduleGroup()) .min().getAsInt(); scheduleNextScheduleGroup(initialScheduleGroup); @@ -213,7 +213,7 @@ public void onExecutorRemoved(final String executorId) { // Schedule a stage after marking the necessary tasks to failed_recoverable. // The stage for one of the tasks that failed is a starting point to look // for the next stage to be scheduled. - scheduleNextScheduleGroup(getSchedulingIndexOfStage( + scheduleNextScheduleGroup(getScheduleGroupOfStage( RuntimeIdGenerator.getStageIdFromTaskId(tasksToReExecute.iterator().next()))); } } @@ -245,7 +245,7 @@ private void scheduleNextScheduleGroup(final int referenceIndex) { /** * Selects the next stage to schedule. - * It takes the referenceScheduleGroupIndex as a reference point to begin looking for the stages to execute: + * It takes the referenceScheduleGroup as a reference point to begin looking for the stages to execute: * * a) returns the failed_recoverable stage(s) of the earliest schedule group, if it(they) exists. * b) returns an empty optional if there are no schedulable stages at the moment. @@ -253,15 +253,15 @@ private void scheduleNextScheduleGroup(final int referenceIndex) { * - if an ancestor schedule group is still executing * c) returns the next set of schedulable stages (if the current schedule group has completed execution) * - * @param referenceScheduleGroupIndex + * @param referenceScheduleGroup * the index of the schedule group that is executing/has executed when this method is called. * @return an optional of the (possibly empty) next schedulable stage */ - private Optional<List<Stage>> selectNextScheduleGroupToSchedule(final int referenceScheduleGroupIndex) { + private Optional<List<Stage>> selectNextScheduleGroupToSchedule(final int referenceScheduleGroup) { // Recursively check the previous schedule group. - if (referenceScheduleGroupIndex > initialScheduleGroup) { + if (referenceScheduleGroup > initialScheduleGroup) { final Optional<List<Stage>> ancestorStagesFromAScheduleGroup = - selectNextScheduleGroupToSchedule(referenceScheduleGroupIndex - 1); + selectNextScheduleGroupToSchedule(referenceScheduleGroup - 1); if (ancestorStagesFromAScheduleGroup.isPresent()) { // Nothing to schedule from the previous schedule group. return ancestorStagesFromAScheduleGroup; @@ -277,7 +277,7 @@ private void scheduleNextScheduleGroup(final int referenceIndex) { // All previous schedule groups are complete, we need to check for the current schedule group. final List<Stage> currentScheduleGroup = reverseTopoStages .stream() - .filter(stage -> stage.getScheduleGroupIndex() == referenceScheduleGroupIndex) + .filter(stage -> stage.getScheduleGroup() == referenceScheduleGroup) .collect(Collectors.toList()); final boolean allStagesOfThisGroupComplete = currentScheduleGroup .stream() @@ -286,7 +286,7 @@ private void scheduleNextScheduleGroup(final int referenceIndex) { .allMatch(state -> state.equals(StageState.State.COMPLETE)); if (!allStagesOfThisGroupComplete) { - LOG.info("There are remaining stages in the current schedule group, {}", referenceScheduleGroupIndex); + LOG.info("There are remaining stages in the current schedule group, {}", referenceScheduleGroup); final List<Stage> stagesToSchedule = currentScheduleGroup .stream() .filter(stage -> { @@ -304,7 +304,7 @@ private void scheduleNextScheduleGroup(final int referenceIndex) { final List<Stage> stagesToSchedule = reverseTopoStages .stream() .filter(stage -> { - if (stage.getScheduleGroupIndex() == referenceScheduleGroupIndex + 1) { + if (stage.getScheduleGroup() == referenceScheduleGroup + 1) { final String stageId = stage.getId(); return jobStateManager.getStageState(stageId) != StageState.State.EXECUTING && jobStateManager.getStageState(stageId) != StageState.State.COMPLETE; @@ -314,7 +314,7 @@ private void scheduleNextScheduleGroup(final int referenceIndex) { .collect(Collectors.toList()); if (stagesToSchedule.isEmpty()) { - LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip this", referenceScheduleGroupIndex + 1); + LOG.debug("ScheduleGroup {}: already executing/complete!, so we skip this", referenceScheduleGroup + 1); return Optional.empty(); } @@ -433,7 +433,7 @@ private void onTaskExecutionComplete(final String executorId, if (jobStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE)) { // if the stage this task belongs to is complete, if (!jobStateManager.isJobDone()) { - scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageIdForTaskUponCompletion)); + scheduleNextScheduleGroup(getScheduleGroupOfStage(stageIdForTaskUponCompletion)); } } schedulerRunner.onAnExecutorAvailable(); @@ -502,7 +502,7 @@ private void onTaskExecutionFailedRecoverable(final String executorId, // TODO #50: Carefully retry tasks in the scheduler case OUTPUT_WRITE_FAILURE: blockManagerMaster.onProducerTaskFailed(taskId); - scheduleNextScheduleGroup(getSchedulingIndexOfStage(stageId)); + scheduleNextScheduleGroup(getScheduleGroupOfStage(stageId)); break; case CONTAINER_FAILURE: LOG.info("Only the failed task will be retried."); @@ -513,7 +513,7 @@ private void onTaskExecutionFailedRecoverable(final String executorId, schedulerRunner.onAnExecutorAvailable(); } - private int getSchedulingIndexOfStage(final String stageId) { - return physicalPlan.getStageDAG().getVertexById(stageId).getScheduleGroupIndex(); + private int getScheduleGroupOfStage(final String stageId) { + return physicalPlan.getStageDAG().getVertexById(stageId).getScheduleGroup(); } } diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java deleted file mode 100644 index 213f5f19..00000000 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/CompositeSchedulingPolicy.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (C) 2018 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.snu.nemo.runtime.master.scheduler; - -import edu.snu.nemo.runtime.common.plan.Task; -import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; - -import javax.inject.Inject; -import java.util.Arrays; -import java.util.List; -import java.util.Set; - -/** - * Temporary class to implement stacked scheduling policy. - * At now, policies are injected through Tang, but have to be configurable by users - * when Nemo supports job-wide execution property. - * TODO #69: Support job-wide execution property. - */ -public final class CompositeSchedulingPolicy implements SchedulingPolicy { - private final List<SchedulingPolicy> schedulingPolicies; - - @Inject - private CompositeSchedulingPolicy(final SourceLocationAwareSchedulingPolicy sourceLocationAwareSchedulingPolicy, - final MinOccupancyFirstSchedulingPolicy minOccupancyFirstSchedulingPolicy, - final FreeSlotSchedulingPolicy freeSlotSchedulingPolicy, - final ContainerTypeAwareSchedulingPolicy containerTypeAwareSchedulingPolicy) { - schedulingPolicies = Arrays.asList( - freeSlotSchedulingPolicy, - containerTypeAwareSchedulingPolicy, - sourceLocationAwareSchedulingPolicy, - minOccupancyFirstSchedulingPolicy); - } - - @Override - public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet, - final Task task) { - Set<ExecutorRepresenter> candidates = executorRepresenterSet; - for (final SchedulingPolicy schedulingPolicy : schedulingPolicies) { - candidates = schedulingPolicy.filterExecutorRepresenters(candidates, task); - } - return candidates; - } -} diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java similarity index 52% rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java index 5f64e9c3..a91e996e 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicy.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraint.java @@ -16,45 +16,29 @@ package edu.snu.nemo.runtime.master.scheduler; import com.google.common.annotations.VisibleForTesting; +import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty; import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; import edu.snu.nemo.runtime.common.plan.Task; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; import javax.inject.Inject; -import java.util.Set; -import java.util.stream.Collectors; /** * This policy find executors which has corresponding container type. */ -public final class ContainerTypeAwareSchedulingPolicy implements SchedulingPolicy { +@AssociatedProperty(ExecutorPlacementProperty.class) +public final class ContainerTypeAwareSchedulingConstraint implements SchedulingConstraint { @VisibleForTesting @Inject - public ContainerTypeAwareSchedulingPolicy() { + public ContainerTypeAwareSchedulingConstraint() { } - /** - * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the container type. - * If the container type of target Task is NONE, it will return the original set. - * @param task {@link Task} to be scheduled. - * @return filtered Set of {@link ExecutorRepresenter}. - */ @Override - public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet, - final Task task) { - + public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) { final String executorPlacementPropertyValue = task.getPropertyValue(ExecutorPlacementProperty.class) .orElse(ExecutorPlacementProperty.NONE); - if (executorPlacementPropertyValue.equals(ExecutorPlacementProperty.NONE)) { - return executorRepresenterSet; - } - - final Set<ExecutorRepresenter> candidateExecutors = - executorRepresenterSet.stream() - .filter(executor -> executor.getContainerType().equals(executorPlacementPropertyValue)) - .collect(Collectors.toSet()); - - return candidateExecutors; + return executorPlacementPropertyValue.equals(ExecutorPlacementProperty.NONE) ? true + : executor.getContainerType().equals(executorPlacementPropertyValue); } } diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java similarity index 50% rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java index d92d9d46..1fc1f6e6 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicy.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java @@ -16,36 +16,29 @@ package edu.snu.nemo.runtime.master.scheduler; import com.google.common.annotations.VisibleForTesting; +import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty; import edu.snu.nemo.runtime.common.plan.Task; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; import javax.inject.Inject; -import java.util.Set; -import java.util.stream.Collectors; /** * This policy finds executor that has free slot for a Task. */ -public final class FreeSlotSchedulingPolicy implements SchedulingPolicy { +@AssociatedProperty(ExecutorSlotComplianceProperty.class) +public final class FreeSlotSchedulingConstraint implements SchedulingConstraint { @VisibleForTesting @Inject - public FreeSlotSchedulingPolicy() { + public FreeSlotSchedulingConstraint() { } - /** - * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the free slot of executors. - * Executors that do not have any free slots will be filtered by this policy. - * @param task {@link Task} to be scheduled. - * @return filtered Set of {@link ExecutorRepresenter}. - */ @Override - public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet, - final Task task) { - final Set<ExecutorRepresenter> candidateExecutors = - executorRepresenterSet.stream() - .filter(executor -> executor.getRunningTasks().size() < executor.getExecutorCapacity()) - .collect(Collectors.toSet()); + public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) { + if (!task.getPropertyValue(ExecutorSlotComplianceProperty.class).orElse(false)) { + return true; + } - return candidateExecutors; + return executor.getRunningTasks().size() < executor.getExecutorCapacity(); } } diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java index 120e1fb0..e53f659a 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java @@ -26,12 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.stream.Collectors; - /** - * {@inheritDoc} - * A scheduling policy used by {@link BatchSingleJobScheduler}. - * * This policy chooses a set of Executors, on which have minimum running Tasks. */ @ThreadSafe @@ -44,28 +39,20 @@ public MinOccupancyFirstSchedulingPolicy() { } - /** - * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by the occupancy of the Executors. - * @param task {@link Task} to be scheduled. - * @return filtered Set of {@link ExecutorRepresenter}. - */ @Override - public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet, - final Task task) { + public ExecutorRepresenter selectExecutor(final Collection<ExecutorRepresenter> executors, final Task task) { final OptionalInt minOccupancy = - executorRepresenterSet.stream() + executors.stream() .map(executor -> executor.getRunningTasks().size()) .mapToInt(i -> i).min(); if (!minOccupancy.isPresent()) { - return Collections.emptySet(); + throw new RuntimeException("Cannot find min occupancy"); } - final Set<ExecutorRepresenter> candidateExecutors = - executorRepresenterSet.stream() + return executors.stream() .filter(executor -> executor.getRunningTasks().size() == minOccupancy.getAsInt()) - .collect(Collectors.toSet()); - - return candidateExecutors; + .findFirst() + .orElseThrow(() -> new RuntimeException("No such executor")); } } diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java index 62af0408..42e9a2c4 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulerRunner.java @@ -20,6 +20,7 @@ import edu.snu.nemo.runtime.common.state.TaskState; import edu.snu.nemo.runtime.master.JobStateManager; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.reef.annotations.audience.DriverSide; import java.util.*; @@ -28,6 +29,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,12 +54,14 @@ private boolean isTerminated; private final DelayedSignalingCondition schedulingIteration = new DelayedSignalingCondition(); - private ExecutorRegistry executorRegistry; - private SchedulingPolicy schedulingPolicy; + private final ExecutorRegistry executorRegistry; + private final SchedulingConstraintRegistry schedulingConstraintRegistry; + private final SchedulingPolicy schedulingPolicy; @VisibleForTesting @Inject - public SchedulerRunner(final SchedulingPolicy schedulingPolicy, + public SchedulerRunner(final SchedulingConstraintRegistry schedulingConstraintRegistry, + final SchedulingPolicy schedulingPolicy, final PendingTaskCollectionPointer pendingTaskCollectionPointer, final ExecutorRegistry executorRegistry) { this.jobStateManagers = new HashMap<>(); @@ -67,6 +71,7 @@ public SchedulerRunner(final SchedulingPolicy schedulingPolicy, this.isTerminated = false; this.executorRegistry = executorRegistry; this.schedulingPolicy = schedulingPolicy; + this.schedulingConstraintRegistry = schedulingConstraintRegistry; } /** @@ -111,16 +116,23 @@ void doScheduleTaskList() { LOG.debug("Trying to schedule {}...", task.getTaskId()); executorRegistry.viewExecutors(executors -> { - final Set<ExecutorRepresenter> candidateExecutors = - schedulingPolicy.filterExecutorRepresenters(executors, task); - final Optional<ExecutorRepresenter> firstCandidate = candidateExecutors.stream().findFirst(); - - if (firstCandidate.isPresent()) { + final MutableObject<Set<ExecutorRepresenter>> candidateExecutors = new MutableObject<>(executors); + task.getExecutionProperties().forEachProperties(property -> { + final Optional<SchedulingConstraint> constraint = schedulingConstraintRegistry.get(property.getClass()); + if (constraint.isPresent() && !candidateExecutors.getValue().isEmpty()) { + candidateExecutors.setValue(candidateExecutors.getValue().stream() + .filter(e -> constraint.get().testSchedulability(e, task)) + .collect(Collectors.toSet())); + } + }); + if (!candidateExecutors.getValue().isEmpty()) { + // Select executor + final ExecutorRepresenter selectedExecutor + = schedulingPolicy.selectExecutor(candidateExecutors.getValue(), task); // update metadata first jobStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING); // send the task - final ExecutorRepresenter selectedExecutor = firstCandidate.get(); selectedExecutor.onTaskScheduled(task); } else { couldNotSchedule.add(task); diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java new file mode 100644 index 00000000..7e713441 --- /dev/null +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraint.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.scheduler; + +import edu.snu.nemo.runtime.common.plan.Task; +import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; +import org.apache.reef.annotations.audience.DriverSide; + +import javax.annotation.concurrent.ThreadSafe; + +/** + * Functions to test schedulability with a pair of an executor and a task. + */ +@DriverSide +@ThreadSafe +@FunctionalInterface +public interface SchedulingConstraint { + boolean testSchedulability(final ExecutorRepresenter executor, final Task task); +} diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java new file mode 100644 index 00000000..97ab5544 --- /dev/null +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.scheduler; + +import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty; +import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty; +import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty; +import org.apache.reef.annotations.audience.DriverSide; + +import javax.annotation.concurrent.ThreadSafe; +import javax.inject.Inject; +import java.lang.reflect.Type; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Registry for {@link SchedulingConstraint}. + */ +@DriverSide +@ThreadSafe +public final class SchedulingConstraintRegistry { + private final Map<Type, SchedulingConstraint> typeToSchedulingConstraintMap = new ConcurrentHashMap<>(); + + @Inject + private SchedulingConstraintRegistry( + final ContainerTypeAwareSchedulingConstraint containerTypeAwareSchedulingConstraint, + final FreeSlotSchedulingConstraint freeSlotSchedulingConstraint, + final SourceLocationAwareSchedulingConstraint sourceLocationAwareSchedulingConstraint) { + registerSchedulingConstraint(containerTypeAwareSchedulingConstraint); + registerSchedulingConstraint(freeSlotSchedulingConstraint); + registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint); + } + + /** + * Registers a {@link SchedulingConstraint}. + * @param policy the policy to register + */ + public void registerSchedulingConstraint(final SchedulingConstraint policy) { + final AssociatedProperty associatedProperty = policy.getClass().getAnnotation(AssociatedProperty.class); + if (associatedProperty == null || associatedProperty.value() == null) { + throw new RuntimeException(String.format("SchedulingConstraint %s has no associated VertexExecutionProperty", + policy.getClass())); + } + final Class<? extends ExecutionProperty> property = associatedProperty.value(); + if (typeToSchedulingConstraintMap.putIfAbsent(property, policy) != null) { + throw new RuntimeException(String.format("Multiple SchedulingConstraint for VertexExecutionProperty %s:" + + "%s, %s", property, typeToSchedulingConstraintMap.get(property), policy)); + } + } + + /** + * Returns {@link SchedulingConstraint} for the given {@link VertexExecutionProperty}. + * @param propertyClass {@link VertexExecutionProperty} class + * @return the corresponding {@link SchedulingConstraint} object, + * or {@link Optional#EMPTY} if no such policy was found + */ + public Optional<SchedulingConstraint> get(final Class<? extends VertexExecutionProperty> propertyClass) { + return Optional.ofNullable(typeToSchedulingConstraintMap.get(propertyClass)); + } +} diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java index 0064310c..95288e7e 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingPolicy.java @@ -17,20 +17,27 @@ import edu.snu.nemo.runtime.common.plan.Task; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; +import net.jcip.annotations.ThreadSafe; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.tang.annotations.DefaultImplementation; -import javax.annotation.concurrent.ThreadSafe; -import java.util.Set; +import java.util.Collection; /** - * (WARNING) Implementations of this interface must be thread-safe. + * A function to select an executor from collection of available executors. */ @DriverSide @ThreadSafe @FunctionalInterface -@DefaultImplementation(CompositeSchedulingPolicy.class) +@DefaultImplementation(MinOccupancyFirstSchedulingPolicy.class) public interface SchedulingPolicy { - Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet, - final Task task); + /** + * A function to select an executor from the specified collection of available executors. + * + * @param executors The collection of available executors. + * Implementations can assume that the collection is not empty. + * @param task The task to schedule + * @return The selected executor. It must be a member of {@code executors}. + */ + ExecutorRepresenter selectExecutor(final Collection<ExecutorRepresenter> executors, final Task task); } diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java similarity index 67% rename from runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java rename to runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java index f84f8a85..f18c9008 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicy.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraint.java @@ -17,6 +17,8 @@ import com.google.common.annotations.VisibleForTesting; import edu.snu.nemo.common.ir.Readable; +import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty; import edu.snu.nemo.runtime.common.plan.Task; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; import org.apache.reef.annotations.audience.DriverSide; @@ -26,21 +28,21 @@ import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; import java.util.*; -import java.util.stream.Collectors; /** * This policy is same as {@link MinOccupancyFirstSchedulingPolicy}, however for Tasks * with {@link edu.snu.nemo.common.ir.vertex.SourceVertex}, it tries to pick one of the executors - * where the corresponding data resides. + * where the corresponding data reside. */ @ThreadSafe @DriverSide -public final class SourceLocationAwareSchedulingPolicy implements SchedulingPolicy { - private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingPolicy.class); +@AssociatedProperty(SourceLocationAwareSchedulingProperty.class) +public final class SourceLocationAwareSchedulingConstraint implements SchedulingConstraint { + private static final Logger LOG = LoggerFactory.getLogger(SourceLocationAwareSchedulingConstraint.class); @VisibleForTesting @Inject - public SourceLocationAwareSchedulingPolicy() { + public SourceLocationAwareSchedulingConstraint() { } /** @@ -56,33 +58,21 @@ public SourceLocationAwareSchedulingPolicy() { return new HashSet<>(sourceLocations); } - /** - * @param executorRepresenterSet Set of {@link ExecutorRepresenter} to be filtered by source location. - * If there is no source locations, will return original set. - * @param task {@link Task} to be scheduled. - * @return filtered Set of {@link ExecutorRepresenter}. - */ @Override - public Set<ExecutorRepresenter> filterExecutorRepresenters(final Set<ExecutorRepresenter> executorRepresenterSet, - final Task task) { + public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) { final Set<String> sourceLocations; try { sourceLocations = getSourceLocations(task.getIrVertexIdToReadable().values()); } catch (final UnsupportedOperationException e) { - return executorRepresenterSet; + return true; } catch (final Exception e) { throw new RuntimeException(e); } if (sourceLocations.size() == 0) { - return executorRepresenterSet; + return true; } - final Set<ExecutorRepresenter> candidateExecutors = - executorRepresenterSet.stream() - .filter(executor -> sourceLocations.contains(executor.getNodeName())) - .collect(Collectors.toSet()); - - return candidateExecutors; + return sourceLocations.contains(executor.getNodeName()); } } diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java index ba8a2f5e..8bfe43d2 100644 --- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java +++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/BatchSingleJobSchedulerTest.java @@ -63,6 +63,7 @@ public final class BatchSingleJobSchedulerTest { private static final Logger LOG = LoggerFactory.getLogger(BatchSingleJobSchedulerTest.class.getName()); private Scheduler scheduler; + private SchedulingConstraintRegistry schedulingConstraint; private SchedulingPolicy schedulingPolicy; private SchedulerRunner schedulerRunner; private ExecutorRegistry executorRegistry; @@ -86,8 +87,9 @@ public void setUp() throws Exception { executorRegistry = injector.getInstance(ExecutorRegistry.class); metricMessageHandler = mock(MetricMessageHandler.class); pendingTaskCollectionPointer = new PendingTaskCollectionPointer(); - schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class); - schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry); + schedulingConstraint = injector.getInstance(SchedulingConstraintRegistry.class); + schedulingPolicy = injector.getInstance(SchedulingPolicy.class); + schedulerRunner = new SchedulerRunner(schedulingConstraint, schedulingPolicy, pendingTaskCollectionPointer, executorRegistry); pubSubEventHandler = mock(PubSubEventHandlerWrapper.class); updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class); scheduler = @@ -154,7 +156,7 @@ private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws Inje // b) the stages of the next ScheduleGroup are scheduled after the stages of each ScheduleGroup are made "complete". for (int i = 0; i < getNumScheduleGroups(plan.getStageDAG()); i++) { final int scheduleGroupIdx = i; - final List<Stage> stages = filterStagesWithAScheduleGroupIndex(plan.getStageDAG(), scheduleGroupIdx); + final List<Stage> stages = filterStagesWithAScheduleGroup(plan.getStageDAG(), scheduleGroupIdx); LOG.debug("Checking that all stages of ScheduleGroup {} enter the executing state", scheduleGroupIdx); stages.forEach(stage -> { @@ -175,10 +177,10 @@ private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws Inje assertTrue(jobStateManager.isJobDone()); } - private List<Stage> filterStagesWithAScheduleGroupIndex( - final DAG<Stage, StageEdge> physicalDAG, final int scheduleGroupIndex) { + private List<Stage> filterStagesWithAScheduleGroup( + final DAG<Stage, StageEdge> physicalDAG, final int scheduleGroup) { final Set<Stage> stageSet = new HashSet<>(physicalDAG.filterVertices( - stage -> stage.getScheduleGroupIndex() == scheduleGroupIndex)); + stage -> stage.getScheduleGroup() == scheduleGroup)); // Return the filtered vertices as a sorted list final List<Stage> sortedStages = new ArrayList<>(stageSet.size()); @@ -192,7 +194,7 @@ private void scheduleAndCheckJobTermination(final PhysicalPlan plan) throws Inje private int getNumScheduleGroups(final DAG<Stage, StageEdge> physicalDAG) { final Set<Integer> scheduleGroupSet = new HashSet<>(); - physicalDAG.getVertices().forEach(stage -> scheduleGroupSet.add(stage.getScheduleGroupIndex())); + physicalDAG.getVertices().forEach(stage -> scheduleGroupSet.add(stage.getScheduleGroup())); return scheduleGroupSet.size(); } } diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java similarity index 78% rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java index 7a2d1490..4aa2ecca 100644 --- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingPolicyTest.java +++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/ContainerTypeAwareSchedulingConstraintTest.java @@ -24,16 +24,17 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.util.*; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; /** - * Tests {@link ContainerTypeAwareSchedulingPolicy}. + * Tests {@link ContainerTypeAwareSchedulingConstraint}. */ @RunWith(PowerMockRunner.class) @PrepareForTest({ExecutorRepresenter.class, Task.class}) -public final class ContainerTypeAwareSchedulingPolicyTest { +public final class ContainerTypeAwareSchedulingConstraintTest { private static ExecutorRepresenter mockExecutorRepresenter(final String containerType) { final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class); @@ -43,7 +44,7 @@ private static ExecutorRepresenter mockExecutorRepresenter(final String containe @Test public void testContainerTypeAware() { - final SchedulingPolicy schedulingPolicy = new ContainerTypeAwareSchedulingPolicy(); + final SchedulingConstraint schedulingConstraint = new ContainerTypeAwareSchedulingConstraint(); final ExecutorRepresenter a0 = mockExecutorRepresenter(ExecutorPlacementProperty.TRANSIENT); final ExecutorRepresenter a1 = mockExecutorRepresenter(ExecutorPlacementProperty.RESERVED); final ExecutorRepresenter a2 = mockExecutorRepresenter(ExecutorPlacementProperty.NONE); @@ -54,10 +55,11 @@ public void testContainerTypeAware() { final Set<ExecutorRepresenter> executorRepresenterList1 = new HashSet<>(Arrays.asList(a0, a1, a2)); - final Set<ExecutorRepresenter> candidateExecutors1 = - schedulingPolicy.filterExecutorRepresenters(executorRepresenterList1, task1); + final Set<ExecutorRepresenter> candidateExecutors1 = executorRepresenterList1.stream() + .filter(e -> schedulingConstraint.testSchedulability(e, task1)) + .collect(Collectors.toSet());; - final Set<ExecutorRepresenter> expectedExecutors1 = new HashSet<>(Arrays.asList(a1)); + final Set<ExecutorRepresenter> expectedExecutors1 = Collections.singleton(a1); assertEquals(expectedExecutors1, candidateExecutors1); final Task task2 = mock(Task.class); @@ -66,8 +68,9 @@ public void testContainerTypeAware() { final Set<ExecutorRepresenter> executorRepresenterList2 = new HashSet<>(Arrays.asList(a0, a1, a2)); - final Set<ExecutorRepresenter> candidateExecutors2 = - schedulingPolicy.filterExecutorRepresenters(executorRepresenterList2, task2); + final Set<ExecutorRepresenter> candidateExecutors2 = executorRepresenterList2.stream() + .filter(e -> schedulingConstraint.testSchedulability(e, task2)) + .collect(Collectors.toSet()); final Set<ExecutorRepresenter> expectedExecutors2 = new HashSet<>(Arrays.asList(a0, a1, a2)); assertEquals(expectedExecutors2, candidateExecutors2); diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java deleted file mode 100644 index 62b51812..00000000 --- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FaultToleranceTest.java +++ /dev/null @@ -1,379 +0,0 @@ -/* - * Copyright (C) 2018 Seoul National University - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package edu.snu.nemo.runtime.master.scheduler; - -import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper; -import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; -import edu.snu.nemo.runtime.common.comm.ControlMessage; -import edu.snu.nemo.runtime.common.message.MessageSender; -import edu.snu.nemo.runtime.common.plan.PhysicalPlan; -import edu.snu.nemo.runtime.common.plan.Stage; -import edu.snu.nemo.runtime.common.state.TaskState; -import edu.snu.nemo.runtime.master.JobStateManager; -import edu.snu.nemo.runtime.master.MetricMessageHandler; -import edu.snu.nemo.runtime.master.BlockManagerMaster; -import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler; -import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; -import edu.snu.nemo.runtime.master.resource.ResourceSpecification; -import edu.snu.nemo.runtime.plangenerator.TestPlanGenerator; -import org.apache.reef.driver.context.ActiveContext; -import org.apache.reef.tang.Injector; -import org.apache.reef.tang.Tang; -import org.apache.reef.tang.exceptions.InjectionException; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.Function; - -import static edu.snu.nemo.runtime.common.state.StageState.State.COMPLETE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; - -/** - * Tests fault tolerance. - */ -@RunWith(PowerMockRunner.class) -@PrepareForTest({BlockManagerMaster.class, SchedulerRunner.class, - PubSubEventHandlerWrapper.class, UpdatePhysicalPlanEventHandler.class, MetricMessageHandler.class}) -public final class FaultToleranceTest { - private static final Logger LOG = LoggerFactory.getLogger(FaultToleranceTest.class.getName()); - - private SchedulingPolicy schedulingPolicy; - private SchedulerRunner schedulerRunner; - private ExecutorRegistry executorRegistry; - - private MetricMessageHandler metricMessageHandler; - private PendingTaskCollectionPointer pendingTaskCollectionPointer; - private PubSubEventHandlerWrapper pubSubEventHandler; - private UpdatePhysicalPlanEventHandler updatePhysicalPlanEventHandler; - private BlockManagerMaster blockManagerMaster = mock(BlockManagerMaster.class); - private final MessageSender<ControlMessage.Message> mockMsgSender = mock(MessageSender.class); - private final ExecutorService serExecutorService = Executors.newSingleThreadExecutor(); - - private static final int MAX_SCHEDULE_ATTEMPT = Integer.MAX_VALUE; - - @Before - public void setUp() throws Exception { - metricMessageHandler = mock(MetricMessageHandler.class); - pubSubEventHandler = mock(PubSubEventHandlerWrapper.class); - updatePhysicalPlanEventHandler = mock(UpdatePhysicalPlanEventHandler.class); - - } - - private Scheduler setUpScheduler(final boolean useMockSchedulerRunner) throws InjectionException { - final Injector injector = Tang.Factory.getTang().newInjector(); - executorRegistry = injector.getInstance(ExecutorRegistry.class); - - pendingTaskCollectionPointer = new PendingTaskCollectionPointer(); - schedulingPolicy = injector.getInstance(CompositeSchedulingPolicy.class); - - if (useMockSchedulerRunner) { - schedulerRunner = mock(SchedulerRunner.class); - } else { - schedulerRunner = new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry); - } - return new BatchSingleJobScheduler(schedulerRunner, pendingTaskCollectionPointer, blockManagerMaster, - pubSubEventHandler, updatePhysicalPlanEventHandler, executorRegistry); - } - - /** - * Tests fault tolerance after a container removal. - */ - @Test(timeout=50000) - public void testContainerRemoval() throws Exception { - final ActiveContext activeContext = mock(ActiveContext.class); - Mockito.doThrow(new RuntimeException()).when(activeContext).close(); - - final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0); - final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId -> - new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId); - final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3"); - final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2"); - final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1"); - - final List<ExecutorRepresenter> executors = new ArrayList<>(); - executors.add(a1); - executors.add(a2); - executors.add(a3); - - final Scheduler scheduler = setUpScheduler(true); - for (final ExecutorRepresenter executor : executors) { - scheduler.onExecutorAdded(executor); - } - - final PhysicalPlan plan = - TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false); - - final JobStateManager jobStateManager = - new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); - scheduler.scheduleJob(plan, jobStateManager); - - final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort(); - - for (final Stage stage : dagOf4Stages) { - if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) { - - // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1. - SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager, - executorRegistry, false); - stage.getTaskIds().forEach(taskId -> - SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, - taskId, TaskState.State.COMPLETE, 1)); - } else if (stage.getScheduleGroupIndex() == 2) { - scheduler.onExecutorRemoved("a3"); - // There are 2 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2. - SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager, - executorRegistry, false); - - // Due to round robin scheduling, "a2" is assured to have a running Task. - scheduler.onExecutorRemoved("a2"); - - // Re-schedule - SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager, - executorRegistry, false); - - final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream() - .map(jobStateManager::getTaskAttempt).max(Integer::compareTo); - assertTrue(maxTaskAttempt.isPresent()); - assertEquals(2, (int) maxTaskAttempt.get()); - - SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager, - executorRegistry, false); - stage.getTaskIds().forEach(taskId -> - SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, - taskId, TaskState.State.COMPLETE, 1)); - } else if (stage.getScheduleGroupIndex() == 3) { - // There are 1 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 3. - // Schedule only the first Task - SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager, - executorRegistry, true); - } else { - throw new RuntimeException(String.format("Unexpected ScheduleGroupIndex: %d", - stage.getScheduleGroupIndex())); - } - } - } - - /** - * Tests fault tolerance after an output write failure. - */ - @Test(timeout=50000) - public void testOutputFailure() throws Exception { - final ActiveContext activeContext = mock(ActiveContext.class); - Mockito.doThrow(new RuntimeException()).when(activeContext).close(); - - final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0); - final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId -> - new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId); - final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3"); - final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2"); - final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1"); - - final List<ExecutorRepresenter> executors = new ArrayList<>(); - executors.add(a1); - executors.add(a2); - executors.add(a3); - final Scheduler scheduler = setUpScheduler(true); - for (final ExecutorRepresenter executor : executors) { - scheduler.onExecutorAdded(executor); - } - - final PhysicalPlan plan = - TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false); - final JobStateManager jobStateManager = - new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); - scheduler.scheduleJob(plan, jobStateManager); - - final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort(); - - for (final Stage stage : dagOf4Stages) { - if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) { - - // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1. - SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager, - executorRegistry, false); - stage.getTaskIds().forEach(taskId -> - SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, - taskId, TaskState.State.COMPLETE, 1)); - } else if (stage.getScheduleGroupIndex() == 2) { - // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2. - SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager, - executorRegistry, false); - stage.getTaskIds().forEach(taskId -> - SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, - taskId, TaskState.State.FAILED_RECOVERABLE, 1, - TaskState.RecoverableFailureCause.OUTPUT_WRITE_FAILURE)); - - // Re-schedule - SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager, - executorRegistry, false); - - final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream() - .map(jobStateManager::getTaskAttempt).max(Integer::compareTo); - assertTrue(maxTaskAttempt.isPresent()); - assertEquals(2, (int) maxTaskAttempt.get()); - - stage.getTaskIds().forEach(taskId -> - assertEquals(TaskState.State.EXECUTING, jobStateManager.getTaskState(taskId))); - } - } - } - - /** - * Tests fault tolerance after an input read failure. - */ - @Test(timeout=50000) - public void testInputReadFailure() throws Exception { - final ActiveContext activeContext = mock(ActiveContext.class); - Mockito.doThrow(new RuntimeException()).when(activeContext).close(); - - final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0); - final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId -> - new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId); - final ExecutorRepresenter a3 = executorRepresenterGenerator.apply("a3"); - final ExecutorRepresenter a2 = executorRepresenterGenerator.apply("a2"); - final ExecutorRepresenter a1 = executorRepresenterGenerator.apply("a1"); - - final List<ExecutorRepresenter> executors = new ArrayList<>(); - executors.add(a1); - executors.add(a2); - executors.add(a3); - final Scheduler scheduler = setUpScheduler(true); - for (final ExecutorRepresenter executor : executors) { - scheduler.onExecutorAdded(executor); - } - - final PhysicalPlan plan = - TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false); - final JobStateManager jobStateManager = - new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); - scheduler.scheduleJob(plan, jobStateManager); - - final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort(); - - for (final Stage stage : dagOf4Stages) { - if (stage.getScheduleGroupIndex() == 0 || stage.getScheduleGroupIndex() == 1) { - - // There are 3 executors, each of capacity 2, and there are 6 Tasks in ScheduleGroup 0 and 1. - SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager, - executorRegistry, false); - stage.getTaskIds().forEach(taskId -> - SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, - taskId, TaskState.State.COMPLETE, 1)); - } else if (stage.getScheduleGroupIndex() == 2) { - // There are 3 executors, each of capacity 2, and there are 2 Tasks in ScheduleGroup 2. - SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager, - executorRegistry, false); - - stage.getTaskIds().forEach(taskId -> - SchedulerTestUtil.sendTaskStateEventToScheduler(scheduler, executorRegistry, - taskId, TaskState.State.FAILED_RECOVERABLE, 1, - TaskState.RecoverableFailureCause.INPUT_READ_FAILURE)); - - // Re-schedule - SchedulerTestUtil.mockSchedulingBySchedulerRunner(pendingTaskCollectionPointer, schedulingPolicy, jobStateManager, - executorRegistry, false); - - final Optional<Integer> maxTaskAttempt = stage.getTaskIds().stream() - .map(jobStateManager::getTaskAttempt).max(Integer::compareTo); - assertTrue(maxTaskAttempt.isPresent()); - assertEquals(2, (int) maxTaskAttempt.get()); - - stage.getTaskIds().forEach(taskId -> - assertEquals(TaskState.State.EXECUTING, jobStateManager.getTaskState(taskId))); - } - } - } - - /** - * Tests the rescheduling of Tasks upon a failure. - */ - @Test(timeout=200000) - public void testTaskReexecutionForFailure() throws Exception { - final ActiveContext activeContext = mock(ActiveContext.class); - Mockito.doThrow(new RuntimeException()).when(activeContext).close(); - - final ResourceSpecification computeSpec = new ResourceSpecification(ExecutorPlacementProperty.COMPUTE, 2, 0); - final Function<String, ExecutorRepresenter> executorRepresenterGenerator = executorId -> - new ExecutorRepresenter(executorId, computeSpec, mockMsgSender, activeContext, serExecutorService, executorId); - - final Scheduler scheduler = setUpScheduler(false); - final PhysicalPlan plan = - TestPlanGenerator.generatePhysicalPlan(TestPlanGenerator.PlanType.TwoVerticesJoined, false); - final JobStateManager jobStateManager = - new JobStateManager(plan, blockManagerMaster, metricMessageHandler, MAX_SCHEDULE_ATTEMPT); - scheduler.scheduleJob(plan, jobStateManager); - - final List<ExecutorRepresenter> executors = new ArrayList<>(); - final List<Stage> dagOf4Stages = plan.getStageDAG().getTopologicalSort(); - - int executorIdIndex = 1; - float removalChance = 0.5f; // Out of 1.0 - final Random random = new Random(0); // Deterministic seed. - - for (final Stage stage : dagOf4Stages) { - - while (jobStateManager.getStageState(stage.getId()) != COMPLETE) { - // By chance, remove or add executor - if (isTrueByChance(random, removalChance)) { - // REMOVE EXECUTOR - if (!executors.isEmpty()) { - scheduler.onExecutorRemoved(executors.remove(random.nextInt(executors.size())).getExecutorId()); - } else { - // Skip, since no executor is running. - } - } else { - if (executors.size() < 3) { - // ADD EXECUTOR - final ExecutorRepresenter newExecutor = executorRepresenterGenerator.apply("a" + executorIdIndex); - executorIdIndex += 1; - executors.add(newExecutor); - scheduler.onExecutorAdded(newExecutor); - } else { - // Skip, in order to keep the total number of running executors below or equal to 3 - } - } - - // Complete the execution of tasks - if (!executors.isEmpty()) { - final int indexOfCompletedExecutor = random.nextInt(executors.size()); - // New set for snapshotting - final Map<String, Integer> runningTaskSnapshot = - new HashMap<>(executors.get(indexOfCompletedExecutor).getRunningTaskToAttempt()); - runningTaskSnapshot.entrySet().forEach(entry -> { - SchedulerTestUtil.sendTaskStateEventToScheduler( - scheduler, executorRegistry, entry.getKey(), TaskState.State.COMPLETE, entry.getValue()); - }); - } - } - } - assertTrue(jobStateManager.isJobDone()); - } - - private boolean isTrueByChance(final Random random, final float chance) { - return chance > random.nextDouble(); - } -} diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java similarity index 75% rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java index c60eeb3a..f2dd8785 100644 --- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingPolicyTest.java +++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java @@ -15,6 +15,7 @@ */ package edu.snu.nemo.runtime.master.scheduler; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty; import edu.snu.nemo.runtime.common.plan.Task; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; import org.junit.Test; @@ -23,17 +24,18 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.util.*; +import java.util.stream.Collectors; import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; /** - * Tests {@link FreeSlotSchedulingPolicy}. + * Tests {@link FreeSlotSchedulingConstraint}. */ @RunWith(PowerMockRunner.class) @PrepareForTest({ExecutorRepresenter.class, Task.class}) -public final class FreeSlotSchedulingPolicyTest { +public final class FreeSlotSchedulingConstraintTest { private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks, final int capacity) { @@ -47,18 +49,20 @@ private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningT @Test public void testFreeSlot() { - final SchedulingPolicy schedulingPolicy = new FreeSlotSchedulingPolicy(); + final SchedulingConstraint schedulingConstraint = new FreeSlotSchedulingConstraint(); final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1); final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3); final Task task = mock(Task.class); + when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(true)); final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1)); - final Set<ExecutorRepresenter> candidateExecutors = - schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task); + final Set<ExecutorRepresenter> candidateExecutors = executorRepresenterList.stream() + .filter(e -> schedulingConstraint.testSchedulability(e, task)) + .collect(Collectors.toSet()); - final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a1)); + final Set<ExecutorRepresenter> expectedExecutors = Collections.singleton(a1); assertEquals(expectedExecutors, candidateExecutors); } } diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java index 60311a63..bf6ebc8a 100644 --- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java +++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicyTest.java @@ -44,7 +44,7 @@ private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningT } @Test - public void testRoundRobin() { + public void test() { final SchedulingPolicy schedulingPolicy = new MinOccupancyFirstSchedulingPolicy(); final ExecutorRepresenter a0 = mockExecutorRepresenter(1); final ExecutorRepresenter a1 = mockExecutorRepresenter(2); @@ -52,13 +52,9 @@ public void testRoundRobin() { final Task task = mock(Task.class); - final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1, a2)); + final List<ExecutorRepresenter> executorRepresenterList = Arrays.asList(a0, a1, a2); - final Set<ExecutorRepresenter> candidateExecutors = - schedulingPolicy.filterExecutorRepresenters(executorRepresenterList, task); - - final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0)); - assertEquals(expectedExecutors, candidateExecutors); + assertEquals(a0, schedulingPolicy.selectExecutor(executorRepresenterList, task)); } } diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java index 41f9642c..387c6b90 100644 --- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java +++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulerTestUtil.java @@ -16,14 +16,11 @@ package edu.snu.nemo.runtime.master.scheduler; import edu.snu.nemo.runtime.common.plan.Stage; -import edu.snu.nemo.runtime.common.plan.Task; import edu.snu.nemo.runtime.common.state.StageState; import edu.snu.nemo.runtime.common.state.TaskState; import edu.snu.nemo.runtime.master.JobStateManager; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; -import java.util.Collection; -import java.util.List; import java.util.Optional; /** @@ -94,23 +91,4 @@ static void sendTaskStateEventToScheduler(final Scheduler scheduler, scheduler.onTaskStateReportFromExecutor(scheduledExecutor.getExecutorId(), taskId, attemptIdx, newState, null, cause); } - - static void sendTaskStateEventToScheduler(final Scheduler scheduler, - final ExecutorRegistry executorRegistry, - final String taskId, - final TaskState.State newState, - final int attemptIdx) { - sendTaskStateEventToScheduler(scheduler, executorRegistry, taskId, newState, attemptIdx, null); - } - - static void mockSchedulingBySchedulerRunner(final PendingTaskCollectionPointer pendingTaskCollectionPointer, - final SchedulingPolicy schedulingPolicy, - final JobStateManager jobStateManager, - final ExecutorRegistry executorRegistry, - final boolean scheduleOnlyTheFirstStage) { - final SchedulerRunner schedulerRunner = - new SchedulerRunner(schedulingPolicy, pendingTaskCollectionPointer, executorRegistry); - schedulerRunner.scheduleJob(jobStateManager); - schedulerRunner.doScheduleTaskList(); - } } diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java similarity index 83% rename from runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java rename to runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java index 2f538ee9..772c587e 100644 --- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingPolicyTest.java +++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SourceLocationAwareSchedulingConstraintTest.java @@ -15,6 +15,7 @@ */ package edu.snu.nemo.runtime.master.scheduler; +import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty; import edu.snu.nemo.runtime.common.plan.Task; import edu.snu.nemo.common.ir.Readable; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; @@ -25,16 +26,18 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.*; /** - * Test cases for {@link SourceLocationAwareSchedulingPolicy}. + * Test cases for {@link SourceLocationAwareSchedulingConstraint}. */ @RunWith(PowerMockRunner.class) @PrepareForTest({ExecutorRepresenter.class, Task.class, Readable.class}) -public final class SourceLocationAwareSchedulingPolicyTest { +public final class SourceLocationAwareSchedulingConstraintTest { private static final String SITE_0 = "SEOUL"; private static final String SITE_1 = "JINJU"; private static final String SITE_2 = "BUSAN"; @@ -46,12 +49,12 @@ private static ExecutorRepresenter mockExecutorRepresenter(final String executor } /** - * {@link SourceLocationAwareSchedulingPolicy} should fail to schedule a {@link Task} when + * {@link SourceLocationAwareSchedulingConstraint} should fail to schedule a {@link Task} when * there are no executors in appropriate location(s). */ @Test public void testSourceLocationAwareSchedulingNotAvailable() { - final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy(); + final SchedulingConstraint schedulingConstraint = new SourceLocationAwareSchedulingConstraint(); // Prepare test scenario final Task task = CreateTask.withReadablesWithSourceLocations( @@ -59,16 +62,17 @@ public void testSourceLocationAwareSchedulingNotAvailable() { final ExecutorRepresenter e0 = mockExecutorRepresenter(SITE_1); final ExecutorRepresenter e1 = mockExecutorRepresenter(SITE_1); - assertEquals(Collections.emptySet(), - schedulingPolicy.filterExecutorRepresenters(new HashSet<>(Arrays.asList(e0, e1)), task)); + assertEquals(Collections.emptySet(), Arrays.asList(e0, e1).stream() + .filter(e -> schedulingConstraint.testSchedulability(e, task)) + .collect(Collectors.toSet())); } /** - * {@link SourceLocationAwareSchedulingPolicy} should properly schedule TGs with multiple source locations. + * {@link SourceLocationAwareSchedulingConstraint} should properly schedule TGs with multiple source locations. */ @Test public void testSourceLocationAwareSchedulingWithMultiSource() { - final SchedulingPolicy schedulingPolicy = new SourceLocationAwareSchedulingPolicy(); + final SchedulingConstraint schedulingConstraint = new SourceLocationAwareSchedulingConstraint(); // Prepare test scenario final Task task0 = CreateTask.withReadablesWithSourceLocations( Collections.singletonList(Collections.singletonList(SITE_1))); @@ -83,8 +87,7 @@ public void testSourceLocationAwareSchedulingWithMultiSource() { final ExecutorRepresenter e = mockExecutorRepresenter(SITE_1); for (final Task task : new HashSet<>(Arrays.asList(task0, task1, task2, task3))) { - assertEquals(new HashSet<>(Collections.singletonList(e)), schedulingPolicy.filterExecutorRepresenters( - new HashSet<>(Collections.singletonList(e)), task)); + assertTrue(schedulingConstraint.testSchedulability(e, task)); } } @@ -103,6 +106,7 @@ private static Task doCreate(final Collection<Readable> readables) { readable)); when(mockInstance.getTaskId()).thenReturn(String.format("T-%d", taskIndex.getAndIncrement())); when(mockInstance.getIrVertexIdToReadable()).thenReturn(readableMap); + when(mockInstance.getPropertyValue(SourceLocationAwareSchedulingProperty.class)).thenReturn(Optional.of(true)); return mockInstance; } diff --git a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java index f8c4f902..3c74fece 100644 --- a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java +++ b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGeneratorTest.java @@ -23,7 +23,7 @@ import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.OperatorVertex; import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty; import edu.snu.nemo.common.ir.vertex.transform.Transform; import edu.snu.nemo.common.test.EmptyComponents; import org.apache.reef.tang.Injector; @@ -63,12 +63,12 @@ public void testSplitScheduleGroupByPullStageEdges() throws Exception { final Stage s0 = stages.next(); final Stage s1 = stages.next(); - assertNotEquals(s0.getScheduleGroupIndex(), s1.getScheduleGroupIndex()); + assertNotEquals(s0.getScheduleGroup(), s1.getScheduleGroup()); } - private static final IRVertex newIRVertex(final int scheduleGroupIndex, final int parallelism) { + private static final IRVertex newIRVertex(final int scheduleGroup, final int parallelism) { final IRVertex irVertex = new OperatorVertex(EMPTY_TRANSFORM); - irVertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex)); + irVertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup)); irVertex.getExecutionProperties().put(ParallelismProperty.of(parallelism)); return irVertex; } diff --git a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java index a60cf880..515bd951 100644 --- a/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java +++ b/tests/src/test/java/edu/snu/nemo/runtime/common/plan/StagePartitionerTest.java @@ -24,7 +24,7 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty; import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty; -import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty; import edu.snu.nemo.common.ir.vertex.transform.Transform; import edu.snu.nemo.common.test.EmptyComponents; import org.apache.reef.tang.Tang; @@ -56,21 +56,21 @@ public void setup() throws InjectionException { /** * @param parallelism {@link ParallelismProperty} value for the new vertex - * @param scheduleGroupIndex {@link ScheduleGroupIndexProperty} value for the new vertex + * @param scheduleGroup {@link ScheduleGroupProperty} value for the new vertex * @param otherProperties other {@link VertexExecutionProperty} for the new vertex * @return new {@link IRVertex} */ - private static IRVertex newVertex(final int parallelism, final int scheduleGroupIndex, + private static IRVertex newVertex(final int parallelism, final int scheduleGroup, final List<VertexExecutionProperty> otherProperties) { final IRVertex vertex = new OperatorVertex(EMPTY_TRANSFORM); vertex.getExecutionProperties().put(ParallelismProperty.of(parallelism)); - vertex.getExecutionProperties().put(ScheduleGroupIndexProperty.of(scheduleGroupIndex)); + vertex.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup)); otherProperties.forEach(property -> vertex.getExecutionProperties().put(property)); return vertex; } /** - * A simple case where two vertices have common parallelism and ScheduleGroupIndex so that get merged into one stage. + * A simple case where two vertices have common parallelism and ScheduleGroup so that get merged into one stage. */ @Test public void testLinear() { @@ -101,10 +101,10 @@ public void testSplitByParallelism() { } /** - * A simple case where two vertices have different ScheduleGroupIndex. + * A simple case where two vertices have different ScheduleGroup. */ @Test - public void testSplitByScheduleGroupIndex() { + public void testSplitByScheduleGroup() { final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>(); final IRVertex v0 = newVertex(1, 0, Collections.emptyList()); final IRVertex v1 = newVertex(1, 1, Collections.emptyList()); diff --git a/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java b/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java new file mode 100644 index 00000000..443ffca8 --- /dev/null +++ b/tests/src/test/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintnRegistryTest.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.runtime.master.scheduler; + +import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorPlacementProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.SourceLocationAwareSchedulingProperty; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** + * Tests {@link SchedulingConstraintRegistry}. + */ +public final class SchedulingConstraintnRegistryTest { + @Test + public void testSchedulingConstraintRegistry() throws InjectionException { + final SchedulingConstraintRegistry registry = Tang.Factory.getTang().newInjector() + .getInstance(SchedulingConstraintRegistry.class); + assertEquals(FreeSlotSchedulingConstraint.class, getConstraintOf(ExecutorSlotComplianceProperty.class, registry)); + assertEquals(ContainerTypeAwareSchedulingConstraint.class, + getConstraintOf(ExecutorPlacementProperty.class, registry)); + assertEquals(SourceLocationAwareSchedulingConstraint.class, + getConstraintOf(SourceLocationAwareSchedulingProperty.class, registry)); + } + + private static Class<? extends SchedulingConstraint> getConstraintOf( + final Class<? extends VertexExecutionProperty> property, final SchedulingConstraintRegistry registry) { + return registry.get(property) + .orElseThrow(() -> new RuntimeException(String.format( + "No SchedulingConstraint found for property %s", property))) + .getClass(); + } +} diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java index 5cf95b5d..33e4988c 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/pass/compiletime/annotating/DefaultScheduleGroupPassTest.java @@ -24,7 +24,7 @@ import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty; import edu.snu.nemo.common.ir.vertex.IRVertex; import edu.snu.nemo.common.ir.vertex.OperatorVertex; -import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupIndexProperty; +import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty; import edu.snu.nemo.common.ir.vertex.transform.Transform; import edu.snu.nemo.common.test.EmptyComponents; import edu.snu.nemo.compiler.optimizer.CompiletimeOptimizer; @@ -54,7 +54,7 @@ @Test public void testAnnotatingPass() { final AnnotatingPass scheduleGroupPass = new DefaultScheduleGroupPass(); - assertEquals(ScheduleGroupIndexProperty.class, scheduleGroupPass.getExecutionPropertyToModify()); + assertEquals(ScheduleGroupProperty.class, scheduleGroupPass.getExecutionPropertyToModify()); } /** @@ -67,11 +67,11 @@ public void testTopologicalOrdering() throws Exception { new TestPolicy(), ""); for (final IRVertex irVertex : processedDAG.getTopologicalSort()) { - final Integer currentScheduleGroupIndex = irVertex.getPropertyValue(ScheduleGroupIndexProperty.class).get(); - final Integer largestScheduleGroupIndexOfParent = processedDAG.getParents(irVertex.getId()).stream() - .mapToInt(v -> v.getPropertyValue(ScheduleGroupIndexProperty.class).get()) + final Integer currentScheduleGroup = irVertex.getPropertyValue(ScheduleGroupProperty.class).get(); + final Integer largestScheduleGroupOfParent = processedDAG.getParents(irVertex.getId()).stream() + .mapToInt(v -> v.getPropertyValue(ScheduleGroupProperty.class).get()) .max().orElse(0); - assertTrue(currentScheduleGroupIndex >= largestScheduleGroupIndexOfParent); + assertTrue(currentScheduleGroup >= largestScheduleGroupOfParent); } } @@ -155,31 +155,31 @@ public void testTopologicalOrdering() throws Exception { } /** - * Asserts that the {@link ScheduleGroupIndexProperty} is equal to {@code expected}. + * Asserts that the {@link ScheduleGroupProperty} is equal to {@code expected}. * @param expected the expected property value * @param vertex the vertex to test */ - private static void assertScheduleGroupIndex(final int expected, final IRVertex vertex) { - assertEquals(expected, getScheduleGroupIndex(vertex)); + private static void assertScheduleGroup(final int expected, final IRVertex vertex) { + assertEquals(expected, getScheduleGroup(vertex)); } /** * @param vertex a vertex - * @return {@link ScheduleGroupIndexProperty} of {@code vertex} + * @return {@link ScheduleGroupProperty} of {@code vertex} */ - private static int getScheduleGroupIndex(final IRVertex vertex) { - return vertex.getPropertyValue(ScheduleGroupIndexProperty.class) + private static int getScheduleGroup(final IRVertex vertex) { + return vertex.getPropertyValue(ScheduleGroupProperty.class) .orElseThrow(() -> new RuntimeException(String.format("ScheduleGroup not set for %s", vertex.getId()))); } /** - * Ensures that all vertices in {@code vertices} have different {@link ScheduleGroupIndexProperty} value. + * Ensures that all vertices in {@code vertices} have different {@link ScheduleGroupProperty} value. * @param vertices vertices to test */ - private static void assertDifferentScheduleGroupIndex(final Collection<IRVertex> vertices) { + private static void assertDifferentScheduleGroup(final Collection<IRVertex> vertices) { final Set<Integer> indices = new HashSet<>(); vertices.forEach(v -> { - final int idx = getScheduleGroupIndex(v); + final int idx = getScheduleGroup(v); assertFalse(indices.contains(idx)); indices.add(idx); }); @@ -194,7 +194,7 @@ public void testBranch() { final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull); pass.apply(dag.left()); - dag.right().forEach(v -> assertScheduleGroupIndex(0, v)); + dag.right().forEach(v -> assertScheduleGroup(0, v)); } /** @@ -206,12 +206,12 @@ public void testBranchWhenMultipleInEdgeNotAllowed() { final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag = generateBranchDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull); pass.apply(dag.left()); - dag.right().subList(0, 4).forEach(v -> assertScheduleGroupIndex(0, v)); - dag.right().subList(4, 5).forEach(v -> assertScheduleGroupIndex(1, v)); + dag.right().subList(0, 4).forEach(v -> assertScheduleGroup(0, v)); + dag.right().subList(4, 5).forEach(v -> assertScheduleGroup(1, v)); } /** - * Test scenario to determine whether push edges properly enforces same scheduleGroupIndex or not. + * Test scenario to determine whether push edges properly enforces same scheduleGroup or not. */ @Test public void testBranchWithPush() { @@ -219,7 +219,7 @@ public void testBranchWithPush() { final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Push); pass.apply(dag.left()); - dag.right().forEach(v -> assertScheduleGroupIndex(0, v)); + dag.right().forEach(v -> assertScheduleGroup(0, v)); } /** @@ -230,7 +230,7 @@ public void testBranchWithBroadcast() { final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(false, true, true); final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag = generateBranchDAG(DataCommunicationPatternProperty.Value.BroadCast, DataFlowModelProperty.Value.Pull); - assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices()); + assertDifferentScheduleGroup(pass.apply(dag.left()).getVertices()); } /** @@ -241,7 +241,7 @@ public void testBranchWithShuffle() { final DefaultScheduleGroupPass pass = new DefaultScheduleGroupPass(true, false, true); final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag = generateBranchDAG(DataCommunicationPatternProperty.Value.Shuffle, DataFlowModelProperty.Value.Pull); - assertDifferentScheduleGroupIndex(pass.apply(dag.left()).getVertices()); + assertDifferentScheduleGroup(pass.apply(dag.left()).getVertices()); } /** @@ -253,11 +253,11 @@ public void testJoin() { final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Pull); pass.apply(dag.left()); - final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0)); - final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2)); - dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v)); - dag.right().subList(2, 4).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v)); - dag.right().subList(4, 6).forEach(v -> assertScheduleGroupIndex(2, v)); + final int idxForFirstScheduleGroup = getScheduleGroup(dag.right().get(0)); + final int idxForSecondScheduleGroup = getScheduleGroup(dag.right().get(2)); + dag.right().subList(0, 2).forEach(v -> assertScheduleGroup(idxForFirstScheduleGroup, v)); + dag.right().subList(2, 4).forEach(v -> assertScheduleGroup(idxForSecondScheduleGroup, v)); + dag.right().subList(4, 6).forEach(v -> assertScheduleGroup(2, v)); } /** @@ -269,7 +269,7 @@ public void testJoinWithPush() { final Pair<DAG<IRVertex, IREdge>, List<IRVertex>> dag = generateJoinDAG(DataCommunicationPatternProperty.Value.OneToOne, DataFlowModelProperty.Value.Push); pass.apply(dag.left()); - dag.right().forEach(v -> assertScheduleGroupIndex(0, v)); + dag.right().forEach(v -> assertScheduleGroup(0, v)); } /** @@ -283,9 +283,9 @@ public void testJoinWithSinglePush() { dag.left().getOutgoingEdgesOf(dag.right().get(1)).iterator().next() .getExecutionProperties().put(DataFlowModelProperty.of(DataFlowModelProperty.Value.Pull)); pass.apply(dag.left()); - final int idxForFirstScheduleGroup = getScheduleGroupIndex(dag.right().get(0)); - final int idxForSecondScheduleGroup = getScheduleGroupIndex(dag.right().get(2)); - dag.right().subList(0, 2).forEach(v -> assertScheduleGroupIndex(idxForFirstScheduleGroup, v)); - dag.right().subList(2, 6).forEach(v -> assertScheduleGroupIndex(idxForSecondScheduleGroup, v)); + final int idxForFirstScheduleGroup = getScheduleGroup(dag.right().get(0)); + final int idxForSecondScheduleGroup = getScheduleGroup(dag.right().get(2)); + dag.right().subList(0, 2).forEach(v -> assertScheduleGroup(idxForFirstScheduleGroup, v)); + dag.right().subList(2, 6).forEach(v -> assertScheduleGroup(idxForSecondScheduleGroup, v)); } } diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java index b1ea8f83..41edef3f 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/optimizer/policy/PolicyBuilderTest.java @@ -28,21 +28,21 @@ @Test public void testDisaggregationPolicy() { final Policy disaggregationPolicy = new DisaggregationPolicy(); - assertEquals(14, disaggregationPolicy.getCompileTimePasses().size()); + assertEquals(16, disaggregationPolicy.getCompileTimePasses().size()); assertEquals(0, disaggregationPolicy.getRuntimePasses().size()); } @Test public void testPadoPolicy() { final Policy padoPolicy = new PadoPolicy(); - assertEquals(16, padoPolicy.getCompileTimePasses().size()); + assertEquals(18, padoPolicy.getCompileTimePasses().size()); assertEquals(0, padoPolicy.getRuntimePasses().size()); } @Test public void testDataSkewPolicy() { final Policy dataSkewPolicy = new DataSkewPolicy(); - assertEquals(18, dataSkewPolicy.getCompileTimePasses().size()); + assertEquals(20, dataSkewPolicy.getCompileTimePasses().size()); assertEquals(1, dataSkewPolicy.getRuntimePasses().size()); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
