[
https://issues.apache.org/jira/browse/SPARK-28362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Peter Backx updated SPARK-28362:
--------------------------------
Description:
It looks like the scheduler is unknowingly creating a DoS attack on the
MapOutputTracker when many tasks are launched at the same time.
We are running a Beam on Spark job on AWS EMR 5.24.0 (Yarn, Spark 2.4.2, Beam
2.12.0)
The job is running on 150 r4.4xlarge machines in cluster mode with executors
sized to take up the full machine. So we have 1 machine acting as driver and
149 executors. Default parallelism is 149 * 13 (cores) * 20 = 38740
When a new stage is launched, sometimes tasks will error out with the message:
"Error communicating with MapOutputTracker".
I've gone over the logs and it looks like the following is happening:
# When the final task of a previous stage completes, the driver launches new
tasks (driver log):
#
## 19/07/10 14:04:57 INFO DAGScheduler: Submitting 38740 missing tasks from
ShuffleMapStage 29 (MapPartitionsRDD[632] at mapToPair at
GroupCombineFunctions.java:147) (first 15 tasks are for partitions Vector(0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
# Executors use the MapOutputTracker to fetch the location of the data they
need to work on (executor log):
## 19/07/10 14:04:57 INFO MapOutputTrackerWorker: Don't have map outputs for
shuffle 36, fetching them
19/07/10 14:04:57 INFO MapOutputTrackerWorker: Doing the fetch; tracker
endpoint =
NettyRpcEndpointRef([spark://[email protected]:42033])
# Usually all executors timeout after 2 minutes. In rare cases some of the
executors seem to receive a reply (executor log):
## 19/07/10 14:06:57 ERROR MapOutputTrackerWorker: Error communicating with
MapOutputTracker
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
seconds]. This timeout is controlled by spark.rpc.askTimeout
at
[org.apache.spark.rpc.RpcTimeout.org|http://org.apache.spark.rpc.rpctimeout.org/]$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
# Driver marks the task as failed and retries (driver log):
## 19/07/10 14:06:57 WARN TaskSetManager: Lost task 1490.0 in stage 29.0 (TID
2724105, ip-172-28-94-245.eu-west-1.compute.internal, executor 248):
org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:270)
I can't find any log with the reason why the executors don't get a reply from
the MapOutputTracker.
So my questions:
* Is there a separate log file for the MapOutputTracker where I can find more
info?
* Is the parallelism set too high? It seems to be fine for the rest of the job.
* Is there anything else we can do? Is there maybe a way to stagger the task
launches so they don't happen all at once?
This is not super critical, but I'd like to get rid of the errors. It happens 2
or 3 times during a 10 hour job and retries always work correctly.
was:
It looks like the scheduler is unknowingly created a DoS attack on the
MapOutputTracker when many tasks are launched at the same time.
We are running a Beam on Spark job on AWS EMR 5.24.0 (Yarn, Spark 2.4.2, Beam
2.12.0)
The job is running on 150 r4.4xlarge machines in cluster mode with executors
sized to take up the full machine. So we have 1 machine acting as driver and
149 executors. Default parallelism is 149 * 13 (cores) * 20 = 38740
When a new stage is launched, sometimes tasks will error out with the message:
"Error communicating with MapOutputTracker".
I've gone over the logs and it looks like the following is happening:
# When the final task of a previous stage completes, the driver launches new
tasks (driver log):
## 19/07/10 14:04:57 INFO DAGScheduler: Submitting 38740 missing tasks from
ShuffleMapStage 29 (MapPartitionsRDD[632] at mapToPair at
GroupCombineFunctions.java:147) (first 15 tasks are for partitions Vector(0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
# Executors use the MapOutputTracker to fetch the location of the data they
need to work on (executor log):
## 19/07/10 14:04:57 INFO MapOutputTrackerWorker: Don't have map outputs for
shuffle 36, fetching them
19/07/10 14:04:57 INFO MapOutputTrackerWorker: Doing the fetch; tracker
endpoint =
NettyRpcEndpointRef([spark://[email protected]:42033])
# Usually all executors timeout after 2 minutes. In rare cases some of the
executors seem to receive a reply (executor log):
## 19/07/10 14:06:57 ERROR MapOutputTrackerWorker: Error communicating with
MapOutputTracker
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
seconds]. This timeout is controlled by spark.rpc.askTimeout
at
[org.apache.spark.rpc.RpcTimeout.org|http://org.apache.spark.rpc.rpctimeout.org/]$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
# Driver marks the task as failed and retries (driver log):
## 19/07/10 14:06:57 WARN TaskSetManager: Lost task 1490.0 in stage 29.0 (TID
2724105, ip-172-28-94-245.eu-west-1.compute.internal, executor 248):
org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:270)
I can't find any log with the reason why the executors don't get a reply from
the MapOutputTracker.
So my questions:
* Is there a separate log file for the MapOutputTracker where I can find more
info?
* Is the parallelism set too high? It seems to be fine for the rest of the job.
* Is there anything else we can do? Is there maybe a way to stagger the task
launches so they don't happen all at once?
This is not super critical, but I'd like to get rid of the errors. It happens 2
or 3 times during a 10 hour job and retries always work correctly.
> Error communicating with MapOutputTracker when many tasks are launched
> concurrently
> -----------------------------------------------------------------------------------
>
> Key: SPARK-28362
> URL: https://issues.apache.org/jira/browse/SPARK-28362
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.2
> Environment: AWS EMR 5.24.0 with Yarn, Spark 2.4.2 and Beam 2.12.0
> Reporter: Peter Backx
> Priority: Major
>
> It looks like the scheduler is unknowingly creating a DoS attack on the
> MapOutputTracker when many tasks are launched at the same time.
> We are running a Beam on Spark job on AWS EMR 5.24.0 (Yarn, Spark 2.4.2, Beam
> 2.12.0)
> The job is running on 150 r4.4xlarge machines in cluster mode with executors
> sized to take up the full machine. So we have 1 machine acting as driver and
> 149 executors. Default parallelism is 149 * 13 (cores) * 20 = 38740
> When a new stage is launched, sometimes tasks will error out with the
> message: "Error communicating with MapOutputTracker".
> I've gone over the logs and it looks like the following is happening:
> # When the final task of a previous stage completes, the driver launches new
> tasks (driver log):
> #
> ## 19/07/10 14:04:57 INFO DAGScheduler: Submitting 38740 missing tasks from
> ShuffleMapStage 29 (MapPartitionsRDD[632] at mapToPair at
> GroupCombineFunctions.java:147) (first 15 tasks are for partitions Vector(0,
> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
> # Executors use the MapOutputTracker to fetch the location of the data they
> need to work on (executor log):
> ## 19/07/10 14:04:57 INFO MapOutputTrackerWorker: Don't have map outputs for
> shuffle 36, fetching them
> 19/07/10 14:04:57 INFO MapOutputTrackerWorker: Doing the fetch; tracker
> endpoint =
> NettyRpcEndpointRef([spark://[email protected]:42033])
> # Usually all executors timeout after 2 minutes. In rare cases some of the
> executors seem to receive a reply (executor log):
> ## 19/07/10 14:06:57 ERROR MapOutputTrackerWorker: Error communicating with
> MapOutputTracker
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at
> [org.apache.spark.rpc.RpcTimeout.org|http://org.apache.spark.rpc.rpctimeout.org/]$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
> # Driver marks the task as failed and retries (driver log):
> ## 19/07/10 14:06:57 WARN TaskSetManager: Lost task 1490.0 in stage 29.0
> (TID 2724105, ip-172-28-94-245.eu-west-1.compute.internal, executor 248):
> org.apache.spark.SparkException: Error communicating with MapOutputTracker
> at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:270)
> I can't find any log with the reason why the executors don't get a reply from
> the MapOutputTracker.
> So my questions:
> * Is there a separate log file for the MapOutputTracker where I can find
> more info?
> * Is the parallelism set too high? It seems to be fine for the rest of the
> job.
> * Is there anything else we can do? Is there maybe a way to stagger the task
> launches so they don't happen all at once?
> This is not super critical, but I'd like to get rid of the errors. It happens
> 2 or 3 times during a 10 hour job and retries always work correctly.
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]