[
https://issues.apache.org/jira/browse/SPARK-49472?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yuming Wang updated SPARK-49472:
--------------------------------
Description:
The task re-run many times and can't complete for a long running stage if
both {{spark.decommission.enabled}} and {{spark.shuffle.service.enabled}} are
enabled.
!stage.png|height=70,width=600!
!task.png|height=70,width=600!
Below is the additional log we added:
{noformat}
---
.../scala/org/apache/spark/scheduler/TaskSetManager.scala | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a1a54daf5f8..5846827c832 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1117,6 +1117,7 @@ private[spark] class TaskSetManager(
/** Called by TaskScheduler when an executor is lost so we can re-enqueue
our tasks */
override def executorLost(execId: String, host: String, reason:
ExecutorLossReason): Unit = {
+ logInfo(s"Executor lost: execId: $execId, host: $host, reason: $reason.")
// Re-enqueue any tasks with potential shuffle data loss that ran on the
failed executor
// if this is a shuffle map stage, and we are not using an external
shuffle server which
// could serve the shuffle outputs or the executor lost is caused by
decommission (which
@@ -1148,8 +1149,11 @@ private[spark] class TaskSetManager(
// This shouldn't not happen ideally since TaskSetManager
handles executor lost first
// before DAGScheduler. So the map statues for the successful
task must be available
// at this moment. keep it here in case the handling order
changes.
- locationOpt.exists(_.host != host)
-
+ val isShuffleMapOutputLoss = locationOpt.exists(_.host != host)
+ logInfo(s"Is shuffle map output available: partition id:
${info.partitionId}, " +
+ s"tid: ${tid}, locationOpt: ${locationOpt}, " +
+ s"isShuffleMapOutputLoss: ${isShuffleMapOutputLoss}.")
+ isShuffleMapOutputLoss
case _ => false
}
// We may have a running task whose partition has been marked as
successful,
{noformat}
Output:
{noformat}
24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Decommission executors: 1608
24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Notify executor 1608 to
decommission.
24/08/26 16:56:22 INFO BlockManagerMasterEndpoint: Mark BlockManagers
(BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 30502, None))
as being decommissioning.
24/08/26 16:56:22 INFO ExecutorAllocationManager: Executors 1608 removed due to
idle timeout.
24/08/26 16:56:23 INFO TaskSetManager: Finished task 4992.1 in stage 7.0 (TID
16851) in 807662 ms on hdc42-mcc10-01-0710-4001-001.company.com (executor 1300)
(5000/6000)
24/08/26 16:56:23 INFO TaskSetManager: Finished task 1335.2 in stage 7.0 (TID
16713) in 903010 ms on hdc42-mcc10-01-0110-7303-009.company.com (executor 1141)
(5001/6000)
24/08/26 16:56:23 INFO TaskSetManager: Finished task 2290.1 in stage 7.0 (TID
16573) in 1115189 ms on hdc42-mcc10-01-1110-3305-038.company (executor 568)
(5002/6000)
24/08/26 16:56:23 INFO TaskSetManager: Finished task 349.1 in stage 7.0 (TID
16916) in 777120 ms on hdc42-mcc10-01-0110-5803-003.company.com (executor 1345)
(5003/6000)
24/08/26 16:56:23 INFO YarnAllocator: Driver requested a total number of 499
executor(s) for resource profile id: 0.
24/08/26 16:56:24 INFO YarnClusterScheduler: Executor 1608 on
hdc42-mcc10-01-0110-7302-007.company.com is decommissioned after 1.3 s.
24/08/26 16:56:24 INFO TaskSetManager: Executor lost: execId: 1608, host:
hdc42-mcc10-01-0110-7302-007.company.com, reason: Executor decommission: spark
scale down.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 3749, tid: 10302, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3749), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 763, tid: 16636, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 763), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 2971, tid: 15433, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 2971), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 5835, tid: 16587, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 5835), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 611, tid: 15118, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 611), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 3750, tid: 10303, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3750), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 3740, tid: 13610, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3740), so
marking it as still running.
24/08/26 16:56:24 INFO DAGScheduler: Executor lost: 1608 (epoch 1)
24/08/26 16:56:24 INFO BlockManagerMasterEndpoint: Trying to remove executor
1608 from BlockManagerMaster.
{noformat}
was:
The task re-run many times and can't complete for a long running stage if
both {{spark.decommission.enabled}} and {{spark.shuffle.service.enabled}} are
enabled.
!stage.png|thumbnail!
!task.png|thumbnail!
!stage.png|height=250,width=250!
Below is the additional log we added:
{noformat}
---
.../scala/org/apache/spark/scheduler/TaskSetManager.scala | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a1a54daf5f8..5846827c832 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -1117,6 +1117,7 @@ private[spark] class TaskSetManager(
/** Called by TaskScheduler when an executor is lost so we can re-enqueue
our tasks */
override def executorLost(execId: String, host: String, reason:
ExecutorLossReason): Unit = {
+ logInfo(s"Executor lost: execId: $execId, host: $host, reason: $reason.")
// Re-enqueue any tasks with potential shuffle data loss that ran on the
failed executor
// if this is a shuffle map stage, and we are not using an external
shuffle server which
// could serve the shuffle outputs or the executor lost is caused by
decommission (which
@@ -1148,8 +1149,11 @@ private[spark] class TaskSetManager(
// This shouldn't not happen ideally since TaskSetManager
handles executor lost first
// before DAGScheduler. So the map statues for the successful
task must be available
// at this moment. keep it here in case the handling order
changes.
- locationOpt.exists(_.host != host)
-
+ val isShuffleMapOutputLoss = locationOpt.exists(_.host != host)
+ logInfo(s"Is shuffle map output available: partition id:
${info.partitionId}, " +
+ s"tid: ${tid}, locationOpt: ${locationOpt}, " +
+ s"isShuffleMapOutputLoss: ${isShuffleMapOutputLoss}.")
+ isShuffleMapOutputLoss
case _ => false
}
// We may have a running task whose partition has been marked as
successful,
{noformat}
Output:
{noformat}
24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Decommission executors: 1608
24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Notify executor 1608 to
decommission.
24/08/26 16:56:22 INFO BlockManagerMasterEndpoint: Mark BlockManagers
(BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 30502, None))
as being decommissioning.
24/08/26 16:56:22 INFO ExecutorAllocationManager: Executors 1608 removed due to
idle timeout.
24/08/26 16:56:23 INFO TaskSetManager: Finished task 4992.1 in stage 7.0 (TID
16851) in 807662 ms on hdc42-mcc10-01-0710-4001-001.company.com (executor 1300)
(5000/6000)
24/08/26 16:56:23 INFO TaskSetManager: Finished task 1335.2 in stage 7.0 (TID
16713) in 903010 ms on hdc42-mcc10-01-0110-7303-009.company.com (executor 1141)
(5001/6000)
24/08/26 16:56:23 INFO TaskSetManager: Finished task 2290.1 in stage 7.0 (TID
16573) in 1115189 ms on hdc42-mcc10-01-1110-3305-038.company (executor 568)
(5002/6000)
24/08/26 16:56:23 INFO TaskSetManager: Finished task 349.1 in stage 7.0 (TID
16916) in 777120 ms on hdc42-mcc10-01-0110-5803-003.company.com (executor 1345)
(5003/6000)
24/08/26 16:56:23 INFO YarnAllocator: Driver requested a total number of 499
executor(s) for resource profile id: 0.
24/08/26 16:56:24 INFO YarnClusterScheduler: Executor 1608 on
hdc42-mcc10-01-0110-7302-007.company.com is decommissioned after 1.3 s.
24/08/26 16:56:24 INFO TaskSetManager: Executor lost: execId: 1608, host:
hdc42-mcc10-01-0110-7302-007.company.com, reason: Executor decommission: spark
scale down.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 3749, tid: 10302, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3749), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 763, tid: 16636, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 763), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 2971, tid: 15433, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 2971), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 5835, tid: 16587, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 5835), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 611, tid: 15118, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 611), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 3750, tid: 10303, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3750), so
marking it as still running.
24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
partition id: 3740, tid: 13610, locationOpt: Some(BlockManagerId(1608,
hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)), isShuffleMapOutputLoss:
false.
24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3740), so
marking it as still running.
24/08/26 16:56:24 INFO DAGScheduler: Executor lost: 1608 (epoch 1)
24/08/26 16:56:24 INFO BlockManagerMasterEndpoint: Trying to remove executor
1608 from BlockManagerMaster.
{noformat}
> Resubmit the task on executor decommission
> ------------------------------------------
>
> Key: SPARK-49472
> URL: https://issues.apache.org/jira/browse/SPARK-49472
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.5.0
> Reporter: Yuming Wang
> Priority: Major
> Attachments: stage.png, task.png
>
>
> The task re-run many times and can't complete for a long running stage if
> both {{spark.decommission.enabled}} and {{spark.shuffle.service.enabled}} are
> enabled.
> !stage.png|height=70,width=600!
> !task.png|height=70,width=600!
> Below is the additional log we added:
> {noformat}
> ---
> .../scala/org/apache/spark/scheduler/TaskSetManager.scala | 8 ++++++--
> 1 file changed, 6 insertions(+), 2 deletions(-)
> diff --git
> a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
> b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
> index a1a54daf5f8..5846827c832 100644
> --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
> +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
> @@ -1117,6 +1117,7 @@ private[spark] class TaskSetManager(
>
> /** Called by TaskScheduler when an executor is lost so we can re-enqueue
> our tasks */
> override def executorLost(execId: String, host: String, reason:
> ExecutorLossReason): Unit = {
> + logInfo(s"Executor lost: execId: $execId, host: $host, reason: $reason.")
> // Re-enqueue any tasks with potential shuffle data loss that ran on the
> failed executor
> // if this is a shuffle map stage, and we are not using an external
> shuffle server which
> // could serve the shuffle outputs or the executor lost is caused by
> decommission (which
> @@ -1148,8 +1149,11 @@ private[spark] class TaskSetManager(
> // This shouldn't not happen ideally since TaskSetManager
> handles executor lost first
> // before DAGScheduler. So the map statues for the successful
> task must be available
> // at this moment. keep it here in case the handling order
> changes.
> - locationOpt.exists(_.host != host)
> -
> + val isShuffleMapOutputLoss = locationOpt.exists(_.host != host)
> + logInfo(s"Is shuffle map output available: partition id:
> ${info.partitionId}, " +
> + s"tid: ${tid}, locationOpt: ${locationOpt}, " +
> + s"isShuffleMapOutputLoss: ${isShuffleMapOutputLoss}.")
> + isShuffleMapOutputLoss
> case _ => false
> }
> // We may have a running task whose partition has been marked as
> successful,
> {noformat}
> Output:
> {noformat}
> 24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Decommission executors:
> 1608
> 24/08/26 16:56:22 INFO YarnClusterSchedulerBackend: Notify executor 1608 to
> decommission.
> 24/08/26 16:56:22 INFO BlockManagerMasterEndpoint: Mark BlockManagers
> (BlockManagerId(1608, hdc42-mcc10-01-0110-7302-007.company.com, 30502, None))
> as being decommissioning.
> 24/08/26 16:56:22 INFO ExecutorAllocationManager: Executors 1608 removed due
> to idle timeout.
> 24/08/26 16:56:23 INFO TaskSetManager: Finished task 4992.1 in stage 7.0 (TID
> 16851) in 807662 ms on hdc42-mcc10-01-0710-4001-001.company.com (executor
> 1300) (5000/6000)
> 24/08/26 16:56:23 INFO TaskSetManager: Finished task 1335.2 in stage 7.0 (TID
> 16713) in 903010 ms on hdc42-mcc10-01-0110-7303-009.company.com (executor
> 1141) (5001/6000)
> 24/08/26 16:56:23 INFO TaskSetManager: Finished task 2290.1 in stage 7.0 (TID
> 16573) in 1115189 ms on hdc42-mcc10-01-1110-3305-038.company (executor 568)
> (5002/6000)
> 24/08/26 16:56:23 INFO TaskSetManager: Finished task 349.1 in stage 7.0 (TID
> 16916) in 777120 ms on hdc42-mcc10-01-0110-5803-003.company.com (executor
> 1345) (5003/6000)
> 24/08/26 16:56:23 INFO YarnAllocator: Driver requested a total number of 499
> executor(s) for resource profile id: 0.
> 24/08/26 16:56:24 INFO YarnClusterScheduler: Executor 1608 on
> hdc42-mcc10-01-0110-7302-007.company.com is decommissioned after 1.3 s.
> 24/08/26 16:56:24 INFO TaskSetManager: Executor lost: execId: 1608, host:
> hdc42-mcc10-01-0110-7302-007.company.com, reason: Executor decommission:
> spark scale down.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
> partition id: 3749, tid: 10302, locationOpt: Some(BlockManagerId(1608,
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)),
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3749), so
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
> partition id: 763, tid: 16636, locationOpt: Some(BlockManagerId(1608,
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)),
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 763), so
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
> partition id: 2971, tid: 15433, locationOpt: Some(BlockManagerId(1608,
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)),
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 2971), so
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
> partition id: 5835, tid: 16587, locationOpt: Some(BlockManagerId(1608,
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)),
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 5835), so
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
> partition id: 611, tid: 15118, locationOpt: Some(BlockManagerId(1608,
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)),
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 611), so
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
> partition id: 3750, tid: 10303, locationOpt: Some(BlockManagerId(1608,
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)),
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3750), so
> marking it as still running.
> 24/08/26 16:56:24 INFO TaskSetManager: Is shuffle map output available:
> partition id: 3740, tid: 13610, locationOpt: Some(BlockManagerId(1608,
> hdc42-mcc10-01-0110-7302-007.company.com, 7337, None)),
> isShuffleMapOutputLoss: false.
> 24/08/26 16:56:24 INFO DAGScheduler: Resubmitted ShuffleMapTask(7, 3740), so
> marking it as still running.
> 24/08/26 16:56:24 INFO DAGScheduler: Executor lost: 1608 (epoch 1)
> 24/08/26 16:56:24 INFO BlockManagerMasterEndpoint: Trying to remove executor
> 1608 from BlockManagerMaster.
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]