johnyangk closed pull request #74: [NEMO-141] Make vertices receiving push edge 
not comply executor slot
URL: https://github.com/apache/incubator-nemo/pull/74
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java
new file mode 100644
index 000000000..4b18885e8
--- /dev/null
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SailfishVertexExecutorSlotCompliancePass.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowModelProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
+
+import java.util.Collections;
+
+/**
+ * Sets {@link ExecutorSlotComplianceProperty}.
+ */
+public final class SailfishVertexExecutorSlotCompliancePass extends 
AnnotatingPass {
+
+  public SailfishVertexExecutorSlotCompliancePass() {
+    super(ExecutorSlotComplianceProperty.class, 
Collections.singleton(DataFlowModelProperty.class));
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    // On every vertex that receive push edge, if 
ExecutorSlotComplianceProperty is not set, put it as false.
+    // For other vertices, if ExecutorSlotComplianceProperty is not set, put 
it as true.
+    dag.getVertices().stream()
+        .filter(v -> 
!v.getExecutionProperties().containsKey(ExecutorSlotComplianceProperty.class))
+        .forEach(v -> {
+          if (dag.getIncomingEdgesOf(v).stream().anyMatch(
+              e -> e.getPropertyValue(DataFlowModelProperty.class)
+                  .orElseThrow(() -> new 
RuntimeException(String.format("DataFlowModelProperty for %s must be set",
+                      e.getId()))).equals(DataFlowModelProperty.Value.Push))) {
+            
v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(false));
+          } else {
+            
v.getExecutionProperties().put(ExecutorSlotComplianceProperty.of(true));
+          }
+        });
+    return dag;
+  }
+}
diff --git 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
index d5c50e4bf..8ffd41fc9 100644
--- 
a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
+++ 
b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SailfishPass.java
@@ -34,7 +34,8 @@ public SailfishPass() {
         new SailfishEdgeDataStorePass(),
         new SailfishEdgeDecoderPass(),
         new SailfishEdgeEncoderPass(),
-        new SailfishEdgeUsedDataHandlingPass()
+        new SailfishEdgeUsedDataHandlingPass(),
+        new SailfishVertexExecutorSlotCompliancePass()
     ));
   }
 }
diff --git a/examples/resources/beam_sample_one_executor_resources.json 
b/examples/resources/beam_sample_one_executor_resources.json
index 069ed973d..4d6aff44e 100644
--- a/examples/resources/beam_sample_one_executor_resources.json
+++ b/examples/resources/beam_sample_one_executor_resources.json
@@ -2,6 +2,6 @@
   {
     "type": "Transient",
     "memory_mb": 512,
-    "capacity": 5
+    "capacity": 2
   }
 ]
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index e0df7662d..e78cb90bc 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -164,10 +164,10 @@ public int getExecutorCapacity() {
   }
 
   /**
-   * @return set of ids of Tasks that are running in this executor
+   * @return the current snapshot of set of Tasks that are running in this 
executor.
    */
   public Set<Task> getRunningTasks() {
-    return runningTasks;
+    return Collections.unmodifiableSet(new HashSet<>(runningTasks));
   }
 
   /**
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
index 1fc1f6e62..f25e4bc1a 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
@@ -15,7 +15,6 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
-import com.google.common.annotations.VisibleForTesting;
 import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
@@ -28,9 +27,9 @@
  */
 @AssociatedProperty(ExecutorSlotComplianceProperty.class)
 public final class FreeSlotSchedulingConstraint implements 
SchedulingConstraint {
-  @VisibleForTesting
+
   @Inject
-  public FreeSlotSchedulingConstraint() {
+  private FreeSlotSchedulingConstraint() {
   }
 
   @Override
@@ -39,6 +38,11 @@ public boolean testSchedulability(final ExecutorRepresenter 
executor, final Task
       return true;
     }
 
-    return executor.getRunningTasks().size() < executor.getExecutorCapacity();
+    // Count the number of tasks which are running in this executor and 
complying the slot constraint.
+    final long numOfComplyingTasks = executor.getRunningTasks().stream()
+        .filter(runningTask -> 
runningTask.getPropertyValue(ExecutorSlotComplianceProperty.class)
+            .orElseGet(() -> true))
+        .count();
+    return numOfComplyingTasks < executor.getExecutorCapacity();
   }
 }
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
index 7449732cd..a4df78507 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
@@ -18,6 +18,8 @@
 import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
+import org.apache.reef.tang.Tang;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -36,28 +38,58 @@
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ExecutorRepresenter.class, Task.class})
 public final class FreeSlotSchedulingConstraintTest {
+  private SchedulingConstraint schedulingConstraint;
+  private ExecutorRepresenter a0;
+  private ExecutorRepresenter a1;
 
+  @Before
+  public void setUp() throws Exception {
+    schedulingConstraint = 
Tang.Factory.getTang().newInjector().getInstance(FreeSlotSchedulingConstraint.class);
+    a0 = mockExecutorRepresenter(1, 1, 1);
+    a1 = mockExecutorRepresenter(2, 2, 3);
+  }
+
+  /**
+   * Mock a task.
+   *
+   * @param taskId the ID of the task to mock.
+   * @return the mocked task.
+   */
   private static Task mockTask(final String taskId) {
     final Task task = mock(Task.class);
     when(task.getTaskId()).thenReturn(taskId);
     return task;
   }
 
-  private static ExecutorRepresenter mockExecutorRepresenter(final int 
numRunningTasks,
+  /**
+   * Mock an executor representer.
+   *
+   * @param numComplyingTasks the number of already running (mocked) tasks 
which comply slot constraint in the executor.
+   * @param numIgnoringTasks  the number of already running (mocked) tasks 
which ignore slot constraint in the executor.
+   * @param capacity          the capacity of the executor.
+   * @return the mocked executor.
+   */
+  private static ExecutorRepresenter mockExecutorRepresenter(final int 
numComplyingTasks,
+                                                             final int 
numIgnoringTasks,
                                                              final int 
capacity) {
     final ExecutorRepresenter executorRepresenter = 
mock(ExecutorRepresenter.class);
     final Set<Task> runningTasks = new HashSet<>();
-    IntStream.range(0, numRunningTasks).forEach(i -> 
runningTasks.add(mockTask(String.valueOf(i))));
+    IntStream.range(0, numComplyingTasks).forEach(i -> 
runningTasks.add(mockTask(String.valueOf(i))));
+    IntStream.range(0, numIgnoringTasks).forEach(i -> {
+      final Task task = mockTask(String.valueOf(numComplyingTasks + i));
+      
when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(false));
+      runningTasks.add(task);
+    });
     when(executorRepresenter.getRunningTasks()).thenReturn(runningTasks);
     when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity);
     return executorRepresenter;
   }
 
+  /**
+   * Test whether the constraint filter full executors.
+   */
   @Test
   public void testFreeSlot() {
-    final SchedulingConstraint schedulingConstraint = new 
FreeSlotSchedulingConstraint();
-    final ExecutorRepresenter a0 = mockExecutorRepresenter(1, 1);
-    final ExecutorRepresenter a1 = mockExecutorRepresenter(2, 3);
 
     final Task task = mock(Task.class);
     
when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(true));
@@ -71,4 +103,23 @@ public void testFreeSlot() {
     final Set<ExecutorRepresenter> expectedExecutors = 
Collections.singleton(a1);
     assertEquals(expectedExecutors, candidateExecutors);
   }
+
+  /**
+   * Test whether a task with false compliance property is not filtered by the 
constraint.
+   */
+  @Test
+  public void testIgnoringSlot() {
+
+    final Task task = mock(Task.class);
+    
when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(false));
+
+    final Set<ExecutorRepresenter> executorRepresenterList = new 
HashSet<>(Arrays.asList(a0, a1));
+
+    final Set<ExecutorRepresenter> candidateExecutors = 
executorRepresenterList.stream()
+        .filter(e -> schedulingConstraint.testSchedulability(e, task))
+        .collect(Collectors.toSet());
+
+    final Set<ExecutorRepresenter> expectedExecutors = new 
HashSet<>(Arrays.asList(a0, a1));
+    assertEquals(expectedExecutors, candidateExecutors);
+  }
 }
diff --git 
a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
 
b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
index d78479563..0f24f55b2 100644
--- 
a/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
+++ 
b/tests/src/test/java/edu/snu/nemo/tests/compiler/backend/nemo/NemoBackendTest.java
@@ -76,7 +76,7 @@ public void testExecutionPlanGeneration() {
     final PhysicalPlan executionPlan = backend.compile(dag, 
physicalPlanGenerator);
 
     assertEquals(2, executionPlan.getStageDAG().getVertices().size());
-    assertEquals(1, 
executionPlan.getStageDAG().getTopologicalSort().get(0).getTaskIds().size());
-    assertEquals(1, 
executionPlan.getStageDAG().getTopologicalSort().get(1).getTaskIds().size());
+    assertEquals(2, 
executionPlan.getStageDAG().getTopologicalSort().get(0).getIRDAG().getVertices().size());
+    assertEquals(3, 
executionPlan.getStageDAG().getTopologicalSort().get(1).getIRDAG().getVertices().size());
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to