devabhishekpal commented on code in PR #7517:
URL: https://github.com/apache/ozone/pull/7517#discussion_r1904410441
##########
hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/ReconTaskControllerImpl.java:
##########
@@ -234,28 +227,55 @@ public synchronized void stop() {
}
/**
- * Wait on results of all tasks.
- * @param results Set of Futures.
- * @param events Events.
- * @return List of failed task names
- * @throws ExecutionException execution Exception
- * @throws InterruptedException Interrupted Exception
+ * For a given list of {@link Callable} tasks process them and add any
failed task to the provided list.
+ * The tasks are executed in parallel, but will wait for the tasks to
complete i.e. the longest
+ * time taken by this method will be the time taken by the longest task in
the list.
+ * @param tasks A list of tasks to execute.
+ * @param events A batch of {@link OMUpdateEventBatch} events to fetch
sequence number of last event in batch.
+ * @param failedTasks Reference of the list to which we want to add the
failed tasks for retry/reprocessing
*/
- private List<String> processTaskResults(List<Future<Pair<String, Boolean>>>
- results,
- OMUpdateEventBatch events)
- throws ExecutionException, InterruptedException {
- List<String> failedTasks = new ArrayList<>();
- for (Future<Pair<String, Boolean>> f : results) {
- String taskName = f.get().getLeft();
- if (!f.get().getRight()) {
- LOG.info("Failed task : {}", taskName);
- failedTasks.add(f.get().getLeft());
- } else {
- taskFailureCounter.get(taskName).set(0);
- storeLastCompletedTransaction(taskName,
events.getLastSequenceNumber());
- }
+ private void processTasks(Collection<NamedCallableTask<Pair<String,
Boolean>>> tasks,
+ OMUpdateEventBatch events, List<String>
failedTasks) {
+ List<CompletableFuture<Void>> futures = tasks.stream()
+ .map(task -> CompletableFuture.supplyAsync(() -> {
+ try {
+ return task.call();
+ } catch (Exception e) {
+ throw new TaskExecutionException(task.getTaskName(), e);
+ }
+ }, executorService).thenAccept(result -> {
+ String taskName = result.getLeft();
+ ReconTaskStatusUpdater taskStatusUpdater =
taskStatusUpdaterManager.getTaskStatusUpdater(taskName);
+ if (!result.getRight()) {
+ LOG.error("Task {} failed", taskName);
+ failedTasks.add(result.getLeft());
+ taskStatusUpdater.setLastTaskRunStatus(-1);
+ } else {
+ taskFailureCounter.get(taskName).set(0);
+ taskStatusUpdater.setLastTaskRunStatus(0);
+
taskStatusUpdater.setLastUpdatedSeqNumber(events.getLastSequenceNumber());
+ }
+ taskStatusUpdater.recordRunCompletion();
+ }).exceptionally(ex -> {
+ LOG.error("Task failed with exception: ", ex);
Review Comment:
Added `testTaskRecordsFailureOnException` to validate this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]