turboFei commented on code in PR #2921:
URL: https://github.com/apache/celeborn/pull/2921#discussion_r1894519161


##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -200,7 +212,116 @@ public static void cancelShuffle(int shuffleId, String 
reason) {
         scheduler.cancelStage(shuffleMapStage.get().id(), new Some<>(reason));
       }
     } else {
-      logger.error("Can not get active SparkContext, skip cancelShuffle.");
+      LOG.error("Can not get active SparkContext, skip cancelShuffle.");
+    }
+  }
+
+  private static final DynFields.UnboundField<ConcurrentHashMap<Long, 
TaskSetManager>>
+      TASK_ID_TO_TASK_SET_MANAGER_FIELD =
+          DynFields.builder()
+              .hiddenImpl(TaskSchedulerImpl.class, "taskIdToTaskSetManager")
+              .defaultAlwaysNull()
+              .build();
+  private static final 
DynFields.UnboundField<scala.collection.mutable.HashMap<Long, TaskInfo>>
+      TASK_INFOS_FIELD =
+          DynFields.builder()
+              .hiddenImpl(TaskSetManager.class, "taskInfos")
+              .defaultAlwaysNull()
+              .build();
+
+  protected static TaskSetManager getTaskSetManager(long taskId) {
+    if (SparkContext$.MODULE$.getActive().nonEmpty()) {
+      TaskSchedulerImpl taskScheduler =
+          (TaskSchedulerImpl) 
SparkContext$.MODULE$.getActive().get().taskScheduler();
+      ConcurrentHashMap<Long, TaskSetManager> taskIdToTaskSetManager =
+          TASK_ID_TO_TASK_SET_MANAGER_FIELD.bind(taskScheduler).get();
+      return taskIdToTaskSetManager.get(taskId);
+    } else {
+      LOG.error("Can not get active SparkContext.");
+      return null;
+    }
+  }
+
+  protected static List<TaskInfo> getTaskAttempts(TaskSetManager 
taskSetManager, long taskId) {
+    if (taskSetManager != null) {
+      scala.Option<TaskInfo> taskInfoOption =
+          TASK_INFOS_FIELD.bind(taskSetManager).get().get(taskId);
+      if (taskInfoOption.isDefined()) {
+        int taskIndex = taskInfoOption.get().index();
+        return scala.collection.JavaConverters.asJavaCollectionConverter(
+                taskSetManager.taskAttempts()[taskIndex])
+            .asJavaCollection().stream()
+            .collect(Collectors.toList());
+      } else {
+        LOG.error("Can not get TaskInfo for taskId: {}", taskId);
+        return Collections.emptyList();
+      }
+    } else {
+      LOG.error("Can not get TaskSetManager for taskId: {}", taskId);
+      return Collections.emptyList();
+    }
+  }
+
+  public static Map<Integer, Set<Long>> 
reportedStageShuffleFetchFailureTaskIds =
+      JavaUtils.newConcurrentHashMap();
+
+  /**
+   * Only check for the shuffle fetch failure task whether another attempt is 
running or successful.
+   * If another attempt(excluding the reported shuffle fetch failure tasks in 
current stage) is
+   * running or successful, return true. Otherwise, return false.
+   */
+  public static synchronized boolean 
taskAnotherAttemptRunningOrSuccessful(long taskId) {
+    TaskSetManager taskSetManager = getTaskSetManager(taskId);
+    if (taskSetManager != null) {
+      int stageId = taskSetManager.stageId();
+      Set<Long> reportedStageTaskIds =
+          reportedStageShuffleFetchFailureTaskIds.computeIfAbsent(stageId, k 
-> new HashSet<>());
+      reportedStageTaskIds.add(taskId);
+
+      List<TaskInfo> taskAttempts = getTaskAttempts(taskSetManager, taskId);
+      Optional<TaskInfo> taskInfoOpt =
+          taskAttempts.stream().filter(ti -> ti.taskId() == 
taskId).findFirst();

Review Comment:
   thanks, addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to