This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f20c408  [SPARK-37481][CORE][WEBUI] Fix disappearance of skipped 
stages after they retry
f20c408 is described below

commit f20c408be0cd2f53815b4b26a88aa8c2e3545a5c
Author: Kent Yao <[email protected]>
AuthorDate: Mon Dec 13 10:40:42 2021 +0800

    [SPARK-37481][CORE][WEBUI] Fix disappearance of skipped stages after they 
retry
    
    ### What changes were proposed in this pull request?
    
    When skipped stages retry, their skipped info will be lost on the UI, and 
then we may see a stage with 200 tasks indeed, shows that it only has 3 tasks 
but its `retry 1` has 15 tasks and completely different inputs/outputs.
    
    A simple way to reproduce,
    
    ```
    bin/spark-sql --packages com.github.yaooqinn:itachi_2.12:0.3.0
    ```
    
    and run
    
    ```
    select * from (select v from (values (1), (2), (3) t(v))) t1 join (select 
stage_id_with_retry(3) from (select v from values (1), (2), (3) t(v) group by 
v)) t2;
    ```
    
    Also, Detailed in the Gist here - 
https://gist.github.com/yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c
    
    In this PR, we increase the nextAttempIds of these skipped stages once they 
get visited.
    
    ### Why are the changes needed?
    
    fix problems when we have skipped stage retries.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, the UI will keep the skipped stages info
    
    
![image](https://user-images.githubusercontent.com/8326978/144010378-02a688ce-0ead-4c41-ab9b-bc5fce4f8b90.png)
    
    ### How was this patch tested?
    
    manually as recorded in 
https://gist.github.com/yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c
    
    existing tests
    
    Closes #34735 from yaooqinn/SPARK-37481.
    
    Authored-by: Kent Yao <[email protected]>
    Signed-off-by: yi.wu <[email protected]>
    (cherry picked from commit 5880df41f50210f2df44f25640437d99b8979d70)
    Signed-off-by: yi.wu <[email protected]>
---
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala  | 6 ++++++
 core/src/main/scala/org/apache/spark/scheduler/Stage.scala         | 7 +++++++
 .../test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala  | 6 +++---
 3 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index bb77a58d..9a27d9c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -716,6 +716,12 @@ private[spark] class DAGScheduler(
                 // read from merged output as the MergeStatuses are not 
available.
                 if (!mapStage.isAvailable || 
!mapStage.shuffleDep.shuffleMergeFinalized) {
                   missing += mapStage
+                } else {
+                  // Forward the nextAttemptId if skipped and get visited for 
the first time.
+                  // Otherwise, once it gets retried,
+                  // 1) the stuffs in stage info become distorting, e.g. task 
num, input byte, e.t.c
+                  // 2) the first attempt starts from 0-idx, it will not be 
marked as a retry
+                  mapStage.increaseAttemptIdOnFirstSkip()
                 }
               case narrowDep: NarrowDependency[_] =>
                 waitingForVisit.prepend(narrowDep.rdd)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index ae7924d..9707211 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -107,6 +107,13 @@ private[scheduler] abstract class Stage(
     nextAttemptId += 1
   }
 
+  /** Forward the nextAttemptId if skipped and get visited for the first time. 
*/
+  def increaseAttemptIdOnFirstSkip(): Unit = {
+    if (nextAttemptId == 0) {
+      nextAttemptId = 1
+    }
+  }
+
   /** Returns the StageInfo for the most recent attempt for this stage. */
   def latestInfo: StageInfo = _latestInfo
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4cb64ed..afea912 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -1622,7 +1622,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     // the shuffle map output is still available from stage 0); make sure 
we've still got internal
     // accumulators setup
     assert(scheduler.stageIdToStage(2).latestInfo.taskMetrics != null)
-    completeShuffleMapStageSuccessfully(2, 0, 2)
+    completeShuffleMapStageSuccessfully(2, 1, 2)
     completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
     assert(results === Map(0 -> 1234, 1 -> 1235))
 
@@ -3514,7 +3514,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
 
     // Check if same merger locs is reused for the new stage with shared 
shuffle dependency
     assert(mergerLocs.zip(newMergerLocs).forall(x => x._1.host == x._2.host))
-    completeShuffleMapStageSuccessfully(2, 0, 2)
+    completeShuffleMapStageSuccessfully(2, 1, 2)
     completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
     assert(results === Map(0 -> 1234, 1 -> 1235))
 
@@ -3650,7 +3650,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     // the scheduler now creates a new task set to regenerate the missing map 
output, but this time
     // using a different stage, the "skipped" one
     assert(scheduler.stageIdToStage(2).latestInfo.taskMetrics != null)
-    completeShuffleMapStageSuccessfully(2, 0, 2)
+    completeShuffleMapStageSuccessfully(2, 1, 2)
     completeNextResultStageWithSuccess(3, 1, idx => idx + 1234)
 
     val expected = (0 until parts).map(idx => (idx, idx + 1234))

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to