johnyangk closed pull request #74: [NEMO-141] Make vertices receiving push edge not comply executor slot URL: https://github.com/apache/incubator-nemo/pull/74
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java new file mode 100644 index 000000000..4b18885e8 --- /dev/null +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2018 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating; + +import edu.snu.nemo.common.dag.DAG; +import edu.snu.nemo.common.ir.edge.IREdge; +import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty; +import edu.snu.nemo.common.ir.vertex.IRVertex; +import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty; + +import java.util.Collections; + +/** + * Sets {@link ExecutorSlotComplianceProperty}. + */ +public final class SailfishVertexExecutorSlotCompliancePass extends AnnotatingPass { + + public SailfishVertexExecutorSlotCompliancePass() { + super(ExecutorSlotComplianceProperty.class, Collections.singleton(DataFlowModelProperty.class)); + } + + @Override + public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) { + // On every vertex that receive push edge, if ExecutorSlotComplianceProperty is not set, put it as false. + // For other vertices, if ExecutorSlotComplianceProperty is not set, put it as true. + dag.getVertices().stream() + .filter(v -> !v.getExecutionProperties().containsKey(ExecutorSlotComplianceProperty.class)) + .forEach(v -> { + if (dag.getIncomingEdgesOf(v).stream().anyMatch( + e -> e.getPropertyValue(DataFlowModelProperty.class) + .orElseThrow(() -> new RuntimeException(String.format("DataFlowModelProperty for %s must be set", + e.getId()))).equals(DataFlowModelProperty.Value.Push))) { + v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(false)); + } else { + v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(true)); + } + }); + return dag; + } +} diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java index d5c50e4bf..8ffd41fc9 100644 --- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java +++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java @@ -34,7 +34,8 @@ public SailfishPass() { new SailfishEdgeDataStorePass(), new SailfishEdgeDecoderPass(), new SailfishEdgeEncoderPass(), - new SailfishEdgeUsedDataHandlingPass() + new SailfishEdgeUsedDataHandlingPass(), + new SailfishVertexExecutorSlotCompliancePass() )); } } diff --git a/examples/resources/beam_sample_one_executor_resources.json b/examples/resources/beam_sample_one_executor_resources.json index 069ed973d..4d6aff44e 100644 --- a/examples/resources/beam_sample_one_executor_resources.json +++ b/examples/resources/beam_sample_one_executor_resources.json @@ -2,6 +2,6 @@ { "type": "Transient", "memory_mb": 512, - "capacity": 5 + "capacity": 2 } ] diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java index e0df7662d..e78cb90bc 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java @@ -164,10 +164,10 @@ public int getExecutorCapacity() { } /** - * @return set of ids of Tasks that are running in this executor + * @return the current snapshot of set of Tasks that are running in this executor. */ public Set<Task> getRunningTasks() { - return runningTasks; + return Collections.unmodifiableSet(new HashSet<>(runningTasks)); } /** diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java index 1fc1f6e62..f25e4bc1a 100644 --- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java +++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java @@ -15,7 +15,6 @@ */ package edu.snu.nemo.runtime.master.scheduler; -import com.google.common.annotations.VisibleForTesting; import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty; import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty; import edu.snu.nemo.runtime.common.plan.Task; @@ -28,9 +27,9 @@ */ @AssociatedProperty(ExecutorSlotComplianceProperty.class) public final class FreeSlotSchedulingConstraint implements SchedulingConstraint { - @VisibleForTesting + @Inject - public FreeSlotSchedulingConstraint() { + private FreeSlotSchedulingConstraint() { } @Override @@ -39,6 +38,11 @@ public boolean testSchedulability(final ExecutorRepresenter executor, final Task return true; } - return executor.getRunningTasks().size() < executor.getExecutorCapacity(); + // Count the number of tasks which are running in this executor and complying the slot constraint. + final long numOfComplyingTasks = executor.getRunningTasks().stream() + .filter(runningTask -> runningTask.getPropertyValue(ExecutorSlotComplianceProperty.class) + .orElseGet(() -> true)) + .count(); + return numOfComplyingTasks < executor.getExecutorCapacity(); } } diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java index 7449732cd..a4df78507 100644 --- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java +++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java @@ -18,6 +18,8 @@ import edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty; import edu.snu.nemo.runtime.common.plan.Task; import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter; +import org.apache.reef.tang.Tang; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -36,28 +38,58 @@ @RunWith(PowerMockRunner.class) @PrepareForTest({ExecutorRepresenter.class, Task.class}) public final class FreeSlotSchedulingConstraintTest { + private SchedulingConstraint schedulingConstraint; + private ExecutorRepresenter a0; + private ExecutorRepresenter a1; + @Before + public void setUp() throws Exception { + schedulingConstraint = Tang.Factory.getTang().newInjector().getInstance(FreeSlotSchedulingConstraint.class); + a0 = mockExecutorRepresenter(1, 1, 1); + a1 = mockExecutorRepresenter(2, 2, 3); + } + + /** + * Mock a task. + * + * @param taskId the ID of the task to mock. + * @return the mocked task. + */ private static Task mockTask(final String taskId) { final Task task = mock(Task.class); when(task.getTaskId()).thenReturn(taskId); return task; } - private static ExecutorRepresenter mockExecutorRepresenter(final int numRunningTasks, + /** + * Mock an executor representer. + * + * @param numComplyingTasks the number of already running (mocked) tasks which comply slot constraint in the executor. + * @param numIgnoringTasks the number of already running (mocked) tasks which ignore slot constraint in the executor. + * @param capacity the capacity of the executor. + * @return the mocked executor. + */ + private static ExecutorRepresenter mockExecutorRepresenter(final int numComplyingTasks, + final int numIgnoringTasks, final int capacity) { final ExecutorRepresenter executorRepresenter = mock(ExecutorRepresenter.class); final Set<Task> runningTasks = new HashSet<>(); - IntStream.range(0, numRunningTasks).forEach(i -> runningTasks.add(mockTask(String.valueOf(i)))); + IntStream.range(0, numComplyingTasks).forEach(i -> runningTasks.add(mockTask(String.valueOf(i)))); + IntStream.range(0, numIgnoringTasks).forEach(i -> { + final Task task = mockTask(String.valueOf(numComplyingTasks + i)); + when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(false)); + runningTasks.add(task); + }); when(executorRepresenter.getRunningTasks()).thenReturn(runningTasks); when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity); return executorRepresenter; } + /** + * Test whether the constraint filter full executors. + */ @Test public void testFreeSlot() { - final SchedulingConstraint schedulingConstraint = new FreeSlotSchedulingConstraint(); - final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1); - final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3); final Task task = mock(Task.class); when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(true)); @@ -71,4 +103,23 @@ public void testFreeSlot() { final Set<ExecutorRepresenter> expectedExecutors = Collections.singleton(a1); assertEquals(expectedExecutors, candidateExecutors); } + + /** + * Test whether a task with false compliance property is not filtered by the constraint. + */ + @Test + public void testIgnoringSlot() { + + final Task task = mock(Task.class); + when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(false)); + + final Set<ExecutorRepresenter> executorRepresenterList = new HashSet<>(Arrays.asList(a0, a1)); + + final Set<ExecutorRepresenter> candidateExecutors = executorRepresenterList.stream() + .filter(e -> schedulingConstraint.testSchedulability(e, task)) + .collect(Collectors.toSet()); + + final Set<ExecutorRepresenter> expectedExecutors = new HashSet<>(Arrays.asList(a0, a1)); + assertEquals(expectedExecutors, candidateExecutors); + } } diff --git a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java index d78479563..0f24f55b2 100644 --- a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java +++ b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java @@ -76,7 +76,7 @@ public void testExecutionPlanGeneration() { final PhysicalPlan executionPlan = backend.compile(dag, physicalPlanGenerator); assertEquals(2, executionPlan.getStageDAG().getVertices().size()); - assertEquals(1, executionPlan.getStageDAG().getTopologicalSort().get(0).getTaskIds().size()); - assertEquals(1, executionPlan.getStageDAG().getTopologicalSort().get(1).getTaskIds().size()); + assertEquals(2, executionPlan.getStageDAG().getTopologicalSort().get(0).getIRDAG().getVertices().size()); + assertEquals(3, executionPlan.getStageDAG().getTopologicalSort().get(1).getIRDAG().getVertices().size()); } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
