sanha closed pull request #79: [NEMO-164] Reduce Overhead of 
FreeSlotSchedulingConstraint
URL: https://github.com/apache/incubator-nemo/pull/79
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index e78cb90bc..6a0a3ee13 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.runtime.master.resource;
 
 import com.google.protobuf.ByteString;
+import 
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
@@ -28,6 +29,7 @@
 import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * (WARNING) This class is not thread-safe, and thus should only be accessed 
through ExecutorRegistry.
@@ -45,7 +47,8 @@
 
   private final String executorId;
   private final ResourceSpecification resourceSpecification;
-  private final Set<Task> runningTasks;
+  private final Map<String, Task> runningComplyingTasks;
+  private final Map<String, Task> runningNonComplyingTasks;
   private final Map<Task, Integer> runningTaskToAttempt;
   private final Set<Task> completeTasks;
   private final Set<Task> failedTasks;
@@ -72,7 +75,8 @@ public ExecutorRepresenter(final String executorId,
     this.executorId = executorId;
     this.resourceSpecification = resourceSpecification;
     this.messageSender = messageSender;
-    this.runningTasks = new HashSet<>();
+    this.runningComplyingTasks = new HashMap<>();
+    this.runningNonComplyingTasks = new HashMap<>();
     this.runningTaskToAttempt = new HashMap<>();
     this.completeTasks = new HashSet<>();
     this.failedTasks = new HashSet<>();
@@ -85,12 +89,13 @@ public ExecutorRepresenter(final String executorId,
    * Marks all Tasks which were running in this executor as failed.
    */
   public Set<String> onExecutorFailed() {
-    failedTasks.addAll(runningTasks);
-    final Set<String> snapshot = runningTasks.stream()
-        .map(Task::getTaskId)
-        .collect(Collectors.toSet());
-    runningTasks.clear();
-    return snapshot;
+    failedTasks.addAll(runningComplyingTasks.values());
+    failedTasks.addAll(runningNonComplyingTasks.values());
+    final Set<String> taskIds = 
Stream.concat(runningComplyingTasks.keySet().stream(),
+        
runningNonComplyingTasks.keySet().stream()).collect(Collectors.toSet());
+    runningComplyingTasks.clear();
+    runningNonComplyingTasks.clear();
+    return taskIds;
   }
 
   /**
@@ -98,25 +103,23 @@ public ExecutorRepresenter(final String executorId,
    * @param task
    */
   public void onTaskScheduled(final Task task) {
-    runningTasks.add(task);
+    (task.getPropertyValue(ExecutorSlotComplianceProperty.class).orElse(true)
+        ? runningComplyingTasks : 
runningNonComplyingTasks).put(task.getTaskId(), task);
     runningTaskToAttempt.put(task, task.getAttemptIdx());
     failedTasks.remove(task);
 
-    serializationExecutorService.submit(new Runnable() {
-      @Override
-      public void run() {
-        final byte[] serialized = SerializationUtils.serialize(task);
-        sendControlMessage(
-            ControlMessage.Message.newBuilder()
-                .setId(RuntimeIdGenerator.generateMessageId())
-                .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
-                .setType(ControlMessage.MessageType.ScheduleTask)
-                .setScheduleTaskMsg(
-                    ControlMessage.ScheduleTaskMsg.newBuilder()
-                        .setTask(ByteString.copyFrom(serialized))
-                        .build())
-                .build());
-      }
+    serializationExecutorService.submit(() -> {
+      final byte[] serialized = SerializationUtils.serialize(task);
+      sendControlMessage(
+          ControlMessage.Message.newBuilder()
+              .setId(RuntimeIdGenerator.generateMessageId())
+              .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
+              .setType(ControlMessage.MessageType.ScheduleTask)
+              .setScheduleTaskMsg(
+                  ControlMessage.ScheduleTaskMsg.newBuilder()
+                      .setTask(ByteString.copyFrom(serialized))
+                      .build())
+              .build());
     });
   }
 
@@ -133,11 +136,7 @@ public void sendControlMessage(final 
ControlMessage.Message message) {
    *
    */
   public void onTaskExecutionComplete(final String taskId) {
-    Task completedTask = runningTasks.stream()
-        .filter(task -> task.getTaskId().equals(taskId)).findFirst()
-        .orElseThrow(() -> new RuntimeException("Completed task not found in 
its ExecutorRepresenter"));
-
-    runningTasks.remove(completedTask);
+    final Task completedTask = removeFromRunningTasks(taskId);
     runningTaskToAttempt.remove(completedTask);
     completeTasks.add(completedTask);
   }
@@ -147,11 +146,7 @@ public void onTaskExecutionComplete(final String taskId) {
    * @param taskId id of the Task
    */
   public void onTaskExecutionFailed(final String taskId) {
-    Task failedTask = runningTasks.stream()
-        .filter(task -> task.getTaskId().equals(taskId)).findFirst()
-        .orElseThrow(() -> new RuntimeException("Failed task not found in its 
ExecutorRepresenter"));
-
-    runningTasks.remove(failedTask);
+    final Task failedTask = removeFromRunningTasks(taskId);
     runningTaskToAttempt.remove(failedTask);
     failedTasks.add(failedTask);
   }
@@ -167,7 +162,29 @@ public int getExecutorCapacity() {
    * @return the current snapshot of set of Tasks that are running in this 
executor.
    */
   public Set<Task> getRunningTasks() {
-    return Collections.unmodifiableSet(new HashSet<>(runningTasks));
+    return Stream.concat(runningComplyingTasks.values().stream(),
+        
runningNonComplyingTasks.values().stream()).collect(Collectors.toSet());
+  }
+
+  /**
+   * @return the number of running {@link Task}s.
+   */
+  public int getNumOfRunningTasks() {
+    return getNumOfComplyingRunningTasks() + 
getNumOfNonComplyingRunningTasks();
+  }
+
+  /**
+   * @return the number of running {@link Task}s that complies to the executor 
slot restriction.
+   */
+  public int getNumOfComplyingRunningTasks() {
+    return runningComplyingTasks.size();
+  }
+
+  /**
+   * @return the number of running {@link Task}s that does not comply to the 
executor slot restriction.
+   */
+  public int getNumOfNonComplyingRunningTasks() {
+    return runningNonComplyingTasks.size();
   }
 
   /**
@@ -202,10 +219,28 @@ public void shutDown() {
   public String toString() {
     final StringBuffer sb = new StringBuffer("ExecutorRepresenter{");
     sb.append("executorId='").append(executorId).append('\'');
-    sb.append(", runningTasks=").append(runningTasks);
+    sb.append(", runningTasks=").append(getRunningTasks());
     sb.append(", failedTasks=").append(failedTasks);
     sb.append('}');
     return sb.toString();
   }
+
+  /**
+   * Removes the specified {@link Task} from the map of running tasks.
+   *
+   * @param taskId id of the task to remove
+   * @return the removed {@link Task}
+   */
+  private Task removeFromRunningTasks(final String taskId) {
+    final Task task;
+    if (runningComplyingTasks.containsKey(taskId)) {
+      task = runningComplyingTasks.remove(taskId);
+    } else if (runningNonComplyingTasks.containsKey(taskId)) {
+      task = runningNonComplyingTasks.remove(taskId);
+    } else {
+      throw new RuntimeException(String.format("Task %s not found in its 
ExecutorRepresenter", taskId));
+    }
+    return task;
+  }
 }
 
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
index f25e4bc1a..c330cd45c 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
@@ -38,11 +38,6 @@ public boolean testSchedulability(final ExecutorRepresenter 
executor, final Task
       return true;
     }
 
-    // Count the number of tasks which are running in this executor and 
complying the slot constraint.
-    final long numOfComplyingTasks = executor.getRunningTasks().stream()
-        .filter(runningTask -> 
runningTask.getPropertyValue(ExecutorSlotComplianceProperty.class)
-            .orElseGet(() -> true))
-        .count();
-    return numOfComplyingTasks < executor.getExecutorCapacity();
+    return executor.getNumOfComplyingRunningTasks() < 
executor.getExecutorCapacity();
   }
 }
diff --git 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
index 1261d44a8..0fa51b582 100644
--- 
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
+++ 
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
@@ -38,7 +38,7 @@ private MinOccupancyFirstSchedulingPolicy() {
   public ExecutorRepresenter selectExecutor(final 
Collection<ExecutorRepresenter> executors, final Task task) {
     final OptionalInt minOccupancy =
         executors.stream()
-        .map(executor -> executor.getRunningTasks().size())
+        .map(executor -> executor.getNumOfRunningTasks())
         .mapToInt(i -> i).min();
 
     if (!minOccupancy.isPresent()) {
@@ -46,7 +46,7 @@ public ExecutorRepresenter selectExecutor(final 
Collection<ExecutorRepresenter>
     }
 
     return executors.stream()
-        .filter(executor -> executor.getRunningTasks().size() == 
minOccupancy.getAsInt())
+        .filter(executor -> executor.getNumOfRunningTasks() == 
minOccupancy.getAsInt())
         .findFirst()
         .orElseThrow(() -> new RuntimeException("No such executor"));
   }
diff --git 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
index e387c709f..871be92fc 100644
--- 
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
+++ 
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
@@ -19,7 +19,6 @@
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -28,7 +27,6 @@
 
 import java.util.*;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.*;
@@ -46,42 +44,21 @@
   @Before
   public void setUp() throws Exception {
     schedulingConstraint = 
Tang.Factory.getTang().newInjector().getInstance(FreeSlotSchedulingConstraint.class);
-    a0 = mockExecutorRepresenter(1, 1, 1);
-    a1 = mockExecutorRepresenter(2, 2, 3);
-  }
-
-  /**
-   * Mock a task.
-   *
-   * @param taskId the ID of the task to mock.
-   * @return the mocked task.
-   */
-  private static Task mockTask(final String taskId) {
-    final Task task = mock(Task.class);
-    when(task.getTaskId()).thenReturn(taskId);
-    return task;
+    a0 = mockExecutorRepresenter(1, 1);
+    a1 = mockExecutorRepresenter(2, 3);
   }
 
   /**
    * Mock an executor representer.
    *
    * @param numComplyingTasks the number of already running (mocked) tasks 
which comply slot constraint in the executor.
-   * @param numIgnoringTasks  the number of already running (mocked) tasks 
which ignore slot constraint in the executor.
    * @param capacity          the capacity of the executor.
    * @return the mocked executor.
    */
   private static ExecutorRepresenter mockExecutorRepresenter(final int 
numComplyingTasks,
-                                                             final int 
numIgnoringTasks,
                                                              final int 
capacity) {
     final ExecutorRepresenter executorRepresenter = 
mock(ExecutorRepresenter.class);
-    final Set<Task> runningTasks = new HashSet<>();
-    IntStream.range(0, numComplyingTasks).forEach(i -> 
runningTasks.add(mockTask(String.valueOf(i))));
-    IntStream.range(0, numIgnoringTasks).forEach(i -> {
-      final Task task = mockTask(String.valueOf(numComplyingTasks + i));
-      
when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(false));
-      runningTasks.add(task);
-    });
-    when(executorRepresenter.getRunningTasks()).thenReturn(runningTasks);
+    
when(executorRepresenter.getNumOfComplyingRunningTasks()).thenReturn(numComplyingTasks);
     when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity);
     return executorRepresenter;
   }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to