ccaominh commented on a change in pull request #9353: Inject things instead of 
subclassing everything for parallel task testing
URL: https://github.com/apache/druid/pull/9353#discussion_r379702969
 
 

 ##########
 File path: 
indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java
 ##########
 @@ -151,127 +195,196 @@ protected void initializeIntermediaryDataManager() 
throws IOException
         ),
         null
     );
+    LocalShuffleClient shuffleClient = new 
LocalShuffleClient(intermediaryDataManager);
+    coordinatorClient = new LocalCoordinatorClient();
+    prepareObjectMapper(
+        objectMapper,
+        getIndexIO(),
+        indexingServiceClient,
+        indexTaskClientFactory,
+        shuffleClient,
+        coordinatorClient
+    );
   }
 
-  public class LocalIndexingServiceClient extends NoopIndexingServiceClient
+  @After
+  public void tearDownAbstractParallelIndexSupervisorTaskTest()
+  {
+    taskRunner.shutdown();
+    temporaryFolder.delete();
+  }
+
+  protected LocalIndexingServiceClient getIndexingServiceClient()
+  {
+    return indexingServiceClient;
+  }
+
+  protected IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> 
getParallelIndexTaskClientFactory()
+  {
+    return indexTaskClientFactory;
+  }
+
+  protected CoordinatorClient getCoordinatorClient()
   {
-    private final ConcurrentMap<String, Future<TaskStatus>> tasks = new 
ConcurrentHashMap<>();
+    return coordinatorClient;
+  }
+
+  private static class TaskContainer
+  {
+    private final Task task;
+    @MonotonicNonNull
+    private volatile Future<TaskStatus> statusFuture;
+    @MonotonicNonNull
+    private volatile TestLocalTaskActionClient actionClient;
+
+    private TaskContainer(Task task)
+    {
+      this.task = task;
+    }
+
+    private void setStatusFuture(Future<TaskStatus> statusFuture)
+    {
+      this.statusFuture = statusFuture;
+    }
+
+    private void setActionClient(TestLocalTaskActionClient actionClient)
+    {
+      this.actionClient = actionClient;
+    }
+  }
+
+  public class SimpleThreadingTaskRunner
+  {
+    private final ConcurrentMap<String, TaskContainer> tasks = new 
ConcurrentHashMap<>();
     private final ListeningExecutorService service = 
MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(5, "parallel-index-supervisor-task-test-%d")
+        Execs.multiThreaded(5, "simple-threading-task-runner-%d")
     );
 
-    @Override
-    public String runTask(Object taskObject)
+    public String run(Task task)
+    {
+      runTask(task);
+      return task.getId();
+    }
+
+    private TaskStatus runAndWait(Task task)
     {
-      final Task subTask = (Task) taskObject;
       try {
-        getTaskStorage().insert(subTask, TaskStatus.running(subTask.getId()));
+        return runTask(task).get();
       }
-      catch (EntryExistsException e) {
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+      catch (ExecutionException e) {
         throw new RuntimeException(e);
       }
+    }
 
-      // WARNING: In production, subtasks are created via HTTP calls and 
instantiated by Jackson, which means they
-      // cannot share objects like they can here. For example, if the indexing 
task uses JsonParseSpec, the same
-      // JSONFlattenerMaker instance is shared among subtasks, which is bad 
since JSONFlattenerMaker is not thread-safe.
-      tasks.put(subTask.getId(), service.submit(() -> {
-        try {
-          final TaskToolbox toolbox = createTaskToolbox(subTask);
-          if (subTask.isReady(toolbox.getTaskActionClient())) {
-            return subTask.run(toolbox);
-          } else {
-            getTaskStorage().setStatus(TaskStatus.failure(subTask.getId()));
-            throw new ISE("task[%s] is not ready", subTask.getId());
-          }
-        }
-        catch (Exception e) {
-          getTaskStorage().setStatus(TaskStatus.failure(subTask.getId(), 
e.getMessage()));
-          throw new RuntimeException(e);
+    private TaskStatus waitToFinish(Task task)
+    {
+      final TaskContainer taskContainer = tasks.get(task.getId());
+      if (taskContainer == null) {
+        throw new IAE("Unknown task[%s]", task.getId());
+      }
+      try {
+        while (taskContainer.statusFuture == null && 
!Thread.currentThread().isInterrupted()) {
+          Thread.sleep(10);
         }
-      }));
-      return subTask.getId();
+        return taskContainer.statusFuture.get();
 
 Review comment:
   Previously, tests would be able to specify a timeout, which is useful for 
failing tests sooner than the travis timeout.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to