mridulm commented on code in PR #2921:
URL: https://github.com/apache/celeborn/pull/2921#discussion_r1894358744


##########
client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SparkUtils.java:
##########
@@ -200,7 +212,116 @@ public static void cancelShuffle(int shuffleId, String 
reason) {
         scheduler.cancelStage(shuffleMapStage.get().id(), new Some<>(reason));
       }
     } else {
-      logger.error("Can not get active SparkContext, skip cancelShuffle.");
+      LOG.error("Can not get active SparkContext, skip cancelShuffle.");
+    }
+  }
+
+  private static final DynFields.UnboundField<ConcurrentHashMap<Long, 
TaskSetManager>>
+      TASK_ID_TO_TASK_SET_MANAGER_FIELD =
+          DynFields.builder()
+              .hiddenImpl(TaskSchedulerImpl.class, "taskIdToTaskSetManager")
+              .defaultAlwaysNull()
+              .build();
+  private static final 
DynFields.UnboundField<scala.collection.mutable.HashMap<Long, TaskInfo>>
+      TASK_INFOS_FIELD =
+          DynFields.builder()
+              .hiddenImpl(TaskSetManager.class, "taskInfos")
+              .defaultAlwaysNull()
+              .build();
+
+  protected static TaskSetManager getTaskSetManager(long taskId) {
+    if (SparkContext$.MODULE$.getActive().nonEmpty()) {
+      TaskSchedulerImpl taskScheduler =
+          (TaskSchedulerImpl) 
SparkContext$.MODULE$.getActive().get().taskScheduler();
+      ConcurrentHashMap<Long, TaskSetManager> taskIdToTaskSetManager =
+          TASK_ID_TO_TASK_SET_MANAGER_FIELD.bind(taskScheduler).get();
+      return taskIdToTaskSetManager.get(taskId);
+    } else {
+      LOG.error("Can not get active SparkContext.");
+      return null;
+    }
+  }
+
+  protected static List<TaskInfo> getTaskAttempts(TaskSetManager 
taskSetManager, long taskId) {
+    if (taskSetManager != null) {
+      scala.Option<TaskInfo> taskInfoOption =
+          TASK_INFOS_FIELD.bind(taskSetManager).get().get(taskId);
+      if (taskInfoOption.isDefined()) {
+        int taskIndex = taskInfoOption.get().index();
+        return scala.collection.JavaConverters.asJavaCollectionConverter(
+                taskSetManager.taskAttempts()[taskIndex])
+            .asJavaCollection().stream()
+            .collect(Collectors.toList());
+      } else {
+        LOG.error("Can not get TaskInfo for taskId: {}", taskId);
+        return Collections.emptyList();
+      }
+    } else {
+      LOG.error("Can not get TaskSetManager for taskId: {}", taskId);
+      return Collections.emptyList();
+    }
+  }
+
+  public static Map<Integer, Set<Long>> 
reportedStageShuffleFetchFailureTaskIds =
+      JavaUtils.newConcurrentHashMap();
+
+  /**
+   * Only check for the shuffle fetch failure task whether another attempt is 
running or successful.
+   * If another attempt(excluding the reported shuffle fetch failure tasks in 
current stage) is
+   * running or successful, return true. Otherwise, return false.
+   */
+  public static synchronized boolean 
taskAnotherAttemptRunningOrSuccessful(long taskId) {

Review Comment:
   This has an inherent race condition - where both tasks end up not raising a 
fetch failure.
   The lock here is on `SparkUtils` - which does not prevent tsm state from 
changing from under us ... 
   Since we have to lock TaskSchedulerImpl anyway for `getTaskAttempts` - I 
would instead suggest locking `taskAnotherAttemptRunningOrSuccessful` on 
`TaskSchedulerImpl` (and not on `SparkUtils`) - which prevent TSM from getting 
mutated under us.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to