seojangho commented on a change in pull request #1: [NEMO-26] Implement 
SourceLocationAwareSchedulingPolicy
URL: https://github.com/apache/incubator-nemo/pull/1#discussion_r173069497
 
 

 ##########
 File path: tests/src/test/java/edu/snu/nemo/tests/runtime/RuntimeTestUtil.java
 ##########
 @@ -15,123 +15,94 @@
  */
 package edu.snu.nemo.tests.runtime;
 
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.physical.*;
-import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.common.state.TaskGroupState;
 import edu.snu.nemo.runtime.master.JobStateManager;
-import edu.snu.nemo.runtime.master.BlockManagerMaster;
-import edu.snu.nemo.runtime.master.resource.ContainerManager;
+import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.scheduler.PendingTaskGroupQueue;
 import edu.snu.nemo.runtime.master.scheduler.Scheduler;
 import edu.snu.nemo.runtime.master.scheduler.SchedulingPolicy;
 import org.apache.beam.sdk.values.KV;
 
 import java.util.*;
-import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
  * Utility class for runtime unit tests.
  */
 public final class RuntimeTestUtil {
-  private static ExecutorService completionEventThreadPool;
-  private static BlockingDeque<Runnable> eventRunnableQueue;
-  private static boolean testComplete;
-
-  public static void initialize() {
-    testComplete = false;
-    completionEventThreadPool = Executors.newFixedThreadPool(5);
-
-    eventRunnableQueue = new LinkedBlockingDeque<>();
-
-    for (int i = 0; i < 5; i++) {
-      completionEventThreadPool.execute(() -> {
-        while (!testComplete || !eventRunnableQueue.isEmpty()) {
-          try {
-            final Runnable event = eventRunnableQueue.takeFirst();
-            event.run();
-          } catch (InterruptedException e) {
-            e.printStackTrace();
-          }
-        }
-      });
-    }
-    completionEventThreadPool.shutdown();
-  }
-
-  public static void cleanup() {
-    testComplete = true;
-  }
-
   /**
    * Sends a stage's completion event to scheduler, with all its task groups 
marked as complete as well.
    * This replaces executor's task group completion messages for testing 
purposes.
    * @param jobStateManager for the submitted job.
    * @param scheduler for the submitted job.
-   * @param containerManager used for testing purposes.
+   * @param executorRegistry provides executor representers
    * @param physicalStage for which the states should be marked as complete.
    */
   public static void sendStageCompletionEventToScheduler(final JobStateManager 
jobStateManager,
                                                          final Scheduler 
scheduler,
-                                                         final 
ContainerManager containerManager,
+                                                         final 
ExecutorRegistry executorRegistry,
                                                          final PhysicalStage 
physicalStage,
                                                          final int attemptIdx) 
{
-    eventRunnableQueue.add(new Runnable() {
-      @Override
-      public void run() {
-        while 
(jobStateManager.getStageState(physicalStage.getId()).getStateMachine().getCurrentState()
-            == StageState.State.EXECUTING) {
-          physicalStage.getTaskGroupIds().forEach(taskGroupId -> {
-            if 
(jobStateManager.getTaskGroupState(taskGroupId).getStateMachine().getCurrentState()
-                == TaskGroupState.State.EXECUTING) {
-              sendTaskGroupStateEventToScheduler(scheduler, containerManager, 
taskGroupId,
-                  TaskGroupState.State.COMPLETE, attemptIdx, null);
-            }
-          });
-        }
+    // Loop until the stage completes.
 
 Review comment:
   Okay.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to