sanha closed pull request #79: [NEMO-164] Reduce Overhead of
FreeSlotSchedulingConstraint
URL: https://github.com/apache/incubator-nemo/pull/79
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index e78cb90bc..6a0a3ee13 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -16,6 +16,7 @@
package edu.snu.nemo.runtime.master.resource;
import com.google.protobuf.ByteString;
+import
edu.snu.nemo.common.ir.vertex.executionproperty.ExecutorSlotComplianceProperty;
import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
import edu.snu.nemo.runtime.common.comm.ControlMessage;
import edu.snu.nemo.runtime.common.message.MessageEnvironment;
@@ -28,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* (WARNING) This class is not thread-safe, and thus should only be accessed
through ExecutorRegistry.
@@ -45,7 +47,8 @@
private final String executorId;
private final ResourceSpecification resourceSpecification;
- private final Set<Task> runningTasks;
+ private final Map<String, Task> runningComplyingTasks;
+ private final Map<String, Task> runningNonComplyingTasks;
private final Map<Task, Integer> runningTaskToAttempt;
private final Set<Task> completeTasks;
private final Set<Task> failedTasks;
@@ -72,7 +75,8 @@ public ExecutorRepresenter(final String executorId,
this.executorId = executorId;
this.resourceSpecification = resourceSpecification;
this.messageSender = messageSender;
- this.runningTasks = new HashSet<>();
+ this.runningComplyingTasks = new HashMap<>();
+ this.runningNonComplyingTasks = new HashMap<>();
this.runningTaskToAttempt = new HashMap<>();
this.completeTasks = new HashSet<>();
this.failedTasks = new HashSet<>();
@@ -85,12 +89,13 @@ public ExecutorRepresenter(final String executorId,
* Marks all Tasks which were running in this executor as failed.
*/
public Set<String> onExecutorFailed() {
- failedTasks.addAll(runningTasks);
- final Set<String> snapshot = runningTasks.stream()
- .map(Task::getTaskId)
- .collect(Collectors.toSet());
- runningTasks.clear();
- return snapshot;
+ failedTasks.addAll(runningComplyingTasks.values());
+ failedTasks.addAll(runningNonComplyingTasks.values());
+ final Set<String> taskIds =
Stream.concat(runningComplyingTasks.keySet().stream(),
+
runningNonComplyingTasks.keySet().stream()).collect(Collectors.toSet());
+ runningComplyingTasks.clear();
+ runningNonComplyingTasks.clear();
+ return taskIds;
}
/**
@@ -98,25 +103,23 @@ public ExecutorRepresenter(final String executorId,
* @param task
*/
public void onTaskScheduled(final Task task) {
- runningTasks.add(task);
+ (task.getPropertyValue(ExecutorSlotComplianceProperty.class).orElse(true)
+ ? runningComplyingTasks :
runningNonComplyingTasks).put(task.getTaskId(), task);
runningTaskToAttempt.put(task, task.getAttemptIdx());
failedTasks.remove(task);
- serializationExecutorService.submit(new Runnable() {
- @Override
- public void run() {
- final byte[] serialized = SerializationUtils.serialize(task);
- sendControlMessage(
- ControlMessage.Message.newBuilder()
- .setId(RuntimeIdGenerator.generateMessageId())
- .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
- .setType(ControlMessage.MessageType.ScheduleTask)
- .setScheduleTaskMsg(
- ControlMessage.ScheduleTaskMsg.newBuilder()
- .setTask(ByteString.copyFrom(serialized))
- .build())
- .build());
- }
+ serializationExecutorService.submit(() -> {
+ final byte[] serialized = SerializationUtils.serialize(task);
+ sendControlMessage(
+ ControlMessage.Message.newBuilder()
+ .setId(RuntimeIdGenerator.generateMessageId())
+ .setListenerId(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID)
+ .setType(ControlMessage.MessageType.ScheduleTask)
+ .setScheduleTaskMsg(
+ ControlMessage.ScheduleTaskMsg.newBuilder()
+ .setTask(ByteString.copyFrom(serialized))
+ .build())
+ .build());
});
}
@@ -133,11 +136,7 @@ public void sendControlMessage(final
ControlMessage.Message message) {
*
*/
public void onTaskExecutionComplete(final String taskId) {
- Task completedTask = runningTasks.stream()
- .filter(task -> task.getTaskId().equals(taskId)).findFirst()
- .orElseThrow(() -> new RuntimeException("Completed task not found in
its ExecutorRepresenter"));
-
- runningTasks.remove(completedTask);
+ final Task completedTask = removeFromRunningTasks(taskId);
runningTaskToAttempt.remove(completedTask);
completeTasks.add(completedTask);
}
@@ -147,11 +146,7 @@ public void onTaskExecutionComplete(final String taskId) {
* @param taskId id of the Task
*/
public void onTaskExecutionFailed(final String taskId) {
- Task failedTask = runningTasks.stream()
- .filter(task -> task.getTaskId().equals(taskId)).findFirst()
- .orElseThrow(() -> new RuntimeException("Failed task not found in its
ExecutorRepresenter"));
-
- runningTasks.remove(failedTask);
+ final Task failedTask = removeFromRunningTasks(taskId);
runningTaskToAttempt.remove(failedTask);
failedTasks.add(failedTask);
}
@@ -167,7 +162,29 @@ public int getExecutorCapacity() {
* @return the current snapshot of set of Tasks that are running in this
executor.
*/
public Set<Task> getRunningTasks() {
- return Collections.unmodifiableSet(new HashSet<>(runningTasks));
+ return Stream.concat(runningComplyingTasks.values().stream(),
+
runningNonComplyingTasks.values().stream()).collect(Collectors.toSet());
+ }
+
+ /**
+ * @return the number of running {@link Task}s.
+ */
+ public int getNumOfRunningTasks() {
+ return getNumOfComplyingRunningTasks() +
getNumOfNonComplyingRunningTasks();
+ }
+
+ /**
+ * @return the number of running {@link Task}s that complies to the executor
slot restriction.
+ */
+ public int getNumOfComplyingRunningTasks() {
+ return runningComplyingTasks.size();
+ }
+
+ /**
+ * @return the number of running {@link Task}s that does not comply to the
executor slot restriction.
+ */
+ public int getNumOfNonComplyingRunningTasks() {
+ return runningNonComplyingTasks.size();
}
/**
@@ -202,10 +219,28 @@ public void shutDown() {
public String toString() {
final StringBuffer sb = new StringBuffer("ExecutorRepresenter{");
sb.append("executorId='").append(executorId).append('\'');
- sb.append(", runningTasks=").append(runningTasks);
+ sb.append(", runningTasks=").append(getRunningTasks());
sb.append(", failedTasks=").append(failedTasks);
sb.append('}');
return sb.toString();
}
+
+ /**
+ * Removes the specified {@link Task} from the map of running tasks.
+ *
+ * @param taskId id of the task to remove
+ * @return the removed {@link Task}
+ */
+ private Task removeFromRunningTasks(final String taskId) {
+ final Task task;
+ if (runningComplyingTasks.containsKey(taskId)) {
+ task = runningComplyingTasks.remove(taskId);
+ } else if (runningNonComplyingTasks.containsKey(taskId)) {
+ task = runningNonComplyingTasks.remove(taskId);
+ } else {
+ throw new RuntimeException(String.format("Task %s not found in its
ExecutorRepresenter", taskId));
+ }
+ return task;
+ }
}
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
index f25e4bc1a..c330cd45c 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraint.java
@@ -38,11 +38,6 @@ public boolean testSchedulability(final ExecutorRepresenter
executor, final Task
return true;
}
- // Count the number of tasks which are running in this executor and
complying the slot constraint.
- final long numOfComplyingTasks = executor.getRunningTasks().stream()
- .filter(runningTask ->
runningTask.getPropertyValue(ExecutorSlotComplianceProperty.class)
- .orElseGet(() -> true))
- .count();
- return numOfComplyingTasks < executor.getExecutorCapacity();
+ return executor.getNumOfComplyingRunningTasks() <
executor.getExecutorCapacity();
}
}
diff --git
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
index 1261d44a8..0fa51b582 100644
---
a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
+++
b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/MinOccupancyFirstSchedulingPolicy.java
@@ -38,7 +38,7 @@ private MinOccupancyFirstSchedulingPolicy() {
public ExecutorRepresenter selectExecutor(final
Collection<ExecutorRepresenter> executors, final Task task) {
final OptionalInt minOccupancy =
executors.stream()
- .map(executor -> executor.getRunningTasks().size())
+ .map(executor -> executor.getNumOfRunningTasks())
.mapToInt(i -> i).min();
if (!minOccupancy.isPresent()) {
@@ -46,7 +46,7 @@ public ExecutorRepresenter selectExecutor(final
Collection<ExecutorRepresenter>
}
return executors.stream()
- .filter(executor -> executor.getRunningTasks().size() ==
minOccupancy.getAsInt())
+ .filter(executor -> executor.getNumOfRunningTasks() ==
minOccupancy.getAsInt())
.findFirst()
.orElseThrow(() -> new RuntimeException("No such executor"));
}
diff --git
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
index e387c709f..871be92fc 100644
---
a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
+++
b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/FreeSlotSchedulingConstraintTest.java
@@ -19,7 +19,6 @@
import edu.snu.nemo.runtime.common.plan.Task;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.tang.Tang;
-import org.apache.reef.tang.exceptions.InjectionException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -28,7 +27,6 @@
import java.util.*;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
@@ -46,42 +44,21 @@
@Before
public void setUp() throws Exception {
schedulingConstraint =
Tang.Factory.getTang().newInjector().getInstance(FreeSlotSchedulingConstraint.class);
- a0 = mockExecutorRepresenter(1, 1, 1);
- a1 = mockExecutorRepresenter(2, 2, 3);
- }
-
- /**
- * Mock a task.
- *
- * @param taskId the ID of the task to mock.
- * @return the mocked task.
- */
- private static Task mockTask(final String taskId) {
- final Task task = mock(Task.class);
- when(task.getTaskId()).thenReturn(taskId);
- return task;
+ a0 = mockExecutorRepresenter(1, 1);
+ a1 = mockExecutorRepresenter(2, 3);
}
/**
* Mock an executor representer.
*
* @param numComplyingTasks the number of already running (mocked) tasks
which comply slot constraint in the executor.
- * @param numIgnoringTasks the number of already running (mocked) tasks
which ignore slot constraint in the executor.
* @param capacity the capacity of the executor.
* @return the mocked executor.
*/
private static ExecutorRepresenter mockExecutorRepresenter(final int
numComplyingTasks,
- final int
numIgnoringTasks,
final int
capacity) {
final ExecutorRepresenter executorRepresenter =
mock(ExecutorRepresenter.class);
- final Set<Task> runningTasks = new HashSet<>();
- IntStream.range(0, numComplyingTasks).forEach(i ->
runningTasks.add(mockTask(String.valueOf(i))));
- IntStream.range(0, numIgnoringTasks).forEach(i -> {
- final Task task = mockTask(String.valueOf(numComplyingTasks + i));
-
when(task.getPropertyValue(ExecutorSlotComplianceProperty.class)).thenReturn(Optional.of(false));
- runningTasks.add(task);
- });
- when(executorRepresenter.getRunningTasks()).thenReturn(runningTasks);
+
when(executorRepresenter.getNumOfComplyingRunningTasks()).thenReturn(numComplyingTasks);
when(executorRepresenter.getExecutorCapacity()).thenReturn(capacity);
return executorRepresenter;
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services