sumitagrawl commented on code in PR #7517:
URL: https://github.com/apache/ozone/pull/7517#discussion_r1904280187


##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/TaskStatusService.java:
##########
@@ -36,18 +34,20 @@
 @Produces(MediaType.APPLICATION_JSON)
 public class TaskStatusService {
 
-  @Inject
   private ReconTaskStatusDao reconTaskStatusDao;
 
+  @Inject
+  TaskStatusService(ReconTaskStatusDao reconTaskStatusDao) {

Review Comment:
   this change not related to this PR, revert



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDeadNodeHandler.java:
##########
@@ -83,8 +83,8 @@ public void onMessage(final DatanodeDetails datanodeDetails,
         LOG.warn("Node {} has reached DEAD state, but SCM does not have " +
             "information about it.", datanodeDetails);
       }
-      containerHealthTask.triggerContainerHealthCheck();
-      pipelineSyncTask.triggerPipelineSyncTask();
+      containerHealthTask.initializeAndRunTask();

Review Comment:
   lock part should be kept inside initializeAndRunTask() only



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/fsck/ContainerHealthTask.java:
##########
@@ -86,43 +83,46 @@ public class ContainerHealthTask extends ReconScmTask {
 
   private final OzoneConfiguration conf;
 
+  private final ReconTaskStatusUpdater taskStatusUpdater;
+
   @SuppressWarnings("checkstyle:ParameterNumber")
   public ContainerHealthTask(
       ContainerManager containerManager,
       StorageContainerServiceProvider scmClient,
-      ReconTaskStatusDao reconTaskStatusDao,
       ContainerHealthSchemaManager containerHealthSchemaManager,
       PlacementPolicy placementPolicy,
       ReconTaskConfig reconTaskConfig,
       ReconContainerMetadataManager reconContainerMetadataManager,
-      OzoneConfiguration conf) {
-    super(reconTaskStatusDao);
+      OzoneConfiguration conf, ReconTaskStatusUpdaterManager 
taskStatusUpdaterManager) {
+    super(taskStatusUpdaterManager);
     this.scmClient = scmClient;
     this.containerHealthSchemaManager = containerHealthSchemaManager;
     this.reconContainerMetadataManager = reconContainerMetadataManager;
     this.placementPolicy = placementPolicy;
     this.containerManager = containerManager;
     this.conf = conf;
     interval = reconTaskConfig.getMissingContainerTaskInterval().toMillis();
+    this.taskStatusUpdater = getTaskStatusUpdater();
   }
 
   @Override
   public void run() {
     try {
       while (canRun()) {
-        triggerContainerHealthCheck();
+        initializeAndRunTask();
         Thread.sleep(interval);
       }
     } catch (Throwable t) {
       LOG.error("Exception in Missing Container task Thread.", t);
+      taskStatusUpdater.setLastTaskRunStatus(-1);

Review Comment:
   do not need set the value here, being handled inside initializeAndRunTask()



##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -234,28 +227,55 @@ public synchronized void stop() {
   }
 
   /**
-   * Wait on results of all tasks.
-   * @param results Set of Futures.
-   * @param events Events.
-   * @return List of failed task names
-   * @throws ExecutionException execution Exception
-   * @throws InterruptedException Interrupted Exception
+   * For a given list of {@link Callable} tasks process them and add any 
failed task to the provided list.
+   * The tasks are executed in parallel, but will wait for the tasks to 
complete i.e. the longest
+   * time taken by this method will be the time taken by the longest task in 
the list.
+   * @param tasks A list of tasks to execute.
+   * @param events A batch of {@link OMUpdateEventBatch} events to fetch 
sequence number of last event in batch.
+   * @param failedTasks Reference of the list to which we want to add the 
failed tasks for retry/reprocessing
    */
-  private List<String> processTaskResults(List<Future<Pair<String, Boolean>>>
-                                              results,
-                                          OMUpdateEventBatch events)
-      throws ExecutionException, InterruptedException {
-    List<String> failedTasks = new ArrayList<>();
-    for (Future<Pair<String, Boolean>> f : results) {
-      String taskName = f.get().getLeft();
-      if (!f.get().getRight()) {
-        LOG.info("Failed task : {}", taskName);
-        failedTasks.add(f.get().getLeft());
-      } else {
-        taskFailureCounter.get(taskName).set(0);
-        storeLastCompletedTransaction(taskName, 
events.getLastSequenceNumber());
-      }
+  private void processTasks(Collection<NamedCallableTask<Pair<String, 
Boolean>>> tasks,
+                            OMUpdateEventBatch events, List<String> 
failedTasks) {
+    List<CompletableFuture<Void>> futures = tasks.stream()
+        .map(task -> CompletableFuture.supplyAsync(() -> {
+          try {
+            return task.call();
+          } catch (Exception e) {
+            throw new TaskExecutionException(task.getTaskName(), e);
+          }
+        }, executorService).thenAccept(result -> {
+          String taskName = result.getLeft();
+          ReconTaskStatusUpdater taskStatusUpdater = 
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+          if (!result.getRight()) {
+            LOG.error("Task {} failed", taskName);
+            failedTasks.add(result.getLeft());
+            taskStatusUpdater.setLastTaskRunStatus(-1);
+          } else {
+            taskFailureCounter.get(taskName).set(0);
+            taskStatusUpdater.setLastTaskRunStatus(0);
+            
taskStatusUpdater.setLastUpdatedSeqNumber(events.getLastSequenceNumber());
+          }
+          taskStatusUpdater.recordRunCompletion();
+        }).exceptionally(ex -> {
+          LOG.error("Task failed with exception: ", ex);

Review Comment:
   verify in UT if completableFuture chain working for exception case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to