johnyangk commented on a change in pull request #1: [NEMO-26] Implement
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r172750797
##########
File path: tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
##########
@@ -15,123 +15,100 @@
*/
package edu.snu.nemo.tests.runtime;
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.common.state.BlockState;
import edu.snu.nemo.runtime.common.state.StageState;
import edu.snu.nemo.runtime.common.state.TaskGroupState;
import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.resource.ContainerManager;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue;
import edu.snu.nemo.runtime.master.scheduler.Scheduler;
import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
import org.apache.beam.sdk.values.KV;
import java.util.*;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Utility class for runtime unit tests.
*/
public final class RuntimeTestUtil {
- private static ExecutorService completionEventThreadPool;
- private static BlockingDeque<Runnable> eventRunnableQueue;
- private static boolean testComplete;
-
- public static void initialize() {
- testComplete = false;
- completionEventThreadPool = Executors.newFixedThreadPool(5);
-
- eventRunnableQueue = new LinkedBlockingDeque<>();
-
- for (int i = 0; i < 5; i++) {
- completionEventThreadPool.execute(() -> {
- while (!testComplete || !eventRunnableQueue.isEmpty()) {
- try {
- final Runnable event = eventRunnableQueue.takeFirst();
- event.run();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
- completionEventThreadPool.shutdown();
- }
-
- public static void cleanup() {
- testComplete = true;
- }
-
/**
* Sends a stage's completion event to scheduler, with all its task groups
marked as complete as well.
* This replaces executor's task group completion messages for testing
purposes.
* @param jobStateManager for the submitted job.
* @param scheduler for the submitted job.
- * @param containerManager used for testing purposes.
+ * @param executorRegistry provides executor representers
* @param physicalStage for which the states should be marked as complete.
*/
public static void sendStageCompletionEventToScheduler(final JobStateManager
jobStateManager,
final Scheduler
scheduler,
- final
ContainerManager containerManager,
+ final
ExecutorRegistry executorRegistry,
final PhysicalStage
physicalStage,
final int attemptIdx)
{
- eventRunnableQueue.add(new Runnable() {
- @Override
- public void run() {
- while
(jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState()
- == StageState.State.EXECUTING) {
- physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
- if
(jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState()
- == TaskGroupState.State.EXECUTING) {
- sendTaskGroupStateEventToScheduler(scheduler, containerManager,
taskGroupId,
- TaskGroupState.State.COMPLETE, attemptIdx, null);
- }
- });
- }
+ // Loop until the stage completes.
+ while (true) {
+ try {
+ Thread.sleep(100);
Review comment:
@seojangho Please remove L52-57. I wrote them to test something and forgot
to remove them. Sorry about this.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services