[ 
https://issues.apache.org/jira/browse/SPARK-28362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Peter Backx updated SPARK-28362:
--------------------------------
    Description: 
It looks like the scheduler is unknowingly creating a DoS attack on the 
MapOutputTracker when many tasks are launched at the same time.

We are running a Beam on Spark job on AWS EMR 5.24.0 (Yarn, Spark 2.4.2, Beam 
2.12.0)

The job is running on 150 r4.4xlarge machines in cluster mode with executors 
sized to take up the full machine. So we have 1 machine acting as driver and 
149 executors. Default parallelism is 149 * 13 (cores) * 20 = 38740

When a new stage is launched, sometimes tasks will error out with the message: 
"Error communicating with MapOutputTracker". 

I've gone over the logs and it looks like the following is happening:
 # When the final task of a previous stage completes, the driver launches new 
tasks (driver log):

 # 
 ## 19/07/10 14:04:57 INFO DAGScheduler: Submitting 38740 missing tasks from 
ShuffleMapStage 29 (MapPartitionsRDD[632] at mapToPair at 
GroupCombineFunctions.java:147) (first 15 tasks are for partitions Vector(0, 1, 
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
 # Executors use the MapOutputTracker to fetch the location of the data they 
need to work on (executor log):
 ## 19/07/10 14:04:57 INFO MapOutputTrackerWorker: Don't have map outputs for 
shuffle 36, fetching them
 19/07/10 14:04:57 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
endpoint = 
NettyRpcEndpointRef([spark://[email protected]:42033])
 # Usually all executors timeout after 2 minutes. In rare cases some of the 
executors seem to receive a reply (executor log):
 ## 19/07/10 14:06:57 ERROR MapOutputTrackerWorker: Error communicating with 
MapOutputTracker
 org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 
seconds]. This timeout is controlled by spark.rpc.askTimeout
 at 
[org.apache.spark.rpc.RpcTimeout.org|http://org.apache.spark.rpc.rpctimeout.org/]$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
 # Driver marks the task as failed and retries (driver log):
 ## 19/07/10 14:06:57 WARN TaskSetManager: Lost task 1490.0 in stage 29.0 (TID 
2724105, ip-172-28-94-245.eu-west-1.compute.internal, executor 248): 
org.apache.spark.SparkException: Error communicating with MapOutputTracker
 at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:270)

I can't find any log with the reason why the executors don't get a reply from 
the MapOutputTracker. 

So my questions:
 * Is there a separate log file for the MapOutputTracker where I can find more 
info?
 * Is the parallelism set too high? It seems to be fine for the rest of the job.
 * Is there anything else we can do? Is there maybe a way to stagger the task 
launches so they don't happen all at once?

This is not super critical, but I'd like to get rid of the errors. It happens 2 
or 3 times during a 10 hour job and retries always work correctly.

  was:
It looks like the scheduler is unknowingly created a DoS attack on the 
MapOutputTracker when many tasks are launched at the same time.

We are running a Beam on Spark job on AWS EMR 5.24.0 (Yarn, Spark 2.4.2, Beam 
2.12.0)

The job is running on 150 r4.4xlarge machines in cluster mode with executors 
sized to take up the full machine. So we have 1 machine acting as driver and 
149 executors. Default parallelism is 149 * 13 (cores) * 20 = 38740

When a new stage is launched, sometimes tasks will error out with the message: 
"Error communicating with MapOutputTracker". 

I've gone over the logs and it looks like the following is happening:
 # When the final task of a previous stage completes, the driver launches new 
tasks (driver log):

 ## 19/07/10 14:04:57 INFO DAGScheduler: Submitting 38740 missing tasks from 
ShuffleMapStage 29 (MapPartitionsRDD[632] at mapToPair at 
GroupCombineFunctions.java:147) (first 15 tasks are for partitions Vector(0, 1, 
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
 # Executors use the MapOutputTracker to fetch the location of the data they 
need to work on (executor log):
 ## 19/07/10 14:04:57 INFO MapOutputTrackerWorker: Don't have map outputs for 
shuffle 36, fetching them
19/07/10 14:04:57 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
endpoint = 
NettyRpcEndpointRef([spark://[email protected]:42033])
 # Usually all executors timeout after 2 minutes. In rare cases some of the 
executors seem to receive a reply (executor log):
 ## 19/07/10 14:06:57 ERROR MapOutputTrackerWorker: Error communicating with 
MapOutputTracker
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 
seconds]. This timeout is controlled by spark.rpc.askTimeout
at 
[org.apache.spark.rpc.RpcTimeout.org|http://org.apache.spark.rpc.rpctimeout.org/]$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
 # Driver marks the task as failed and retries (driver log):
 ## 19/07/10 14:06:57 WARN TaskSetManager: Lost task 1490.0 in stage 29.0 (TID 
2724105, ip-172-28-94-245.eu-west-1.compute.internal, executor 248): 
org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:270)

I can't find any log with the reason why the executors don't get a reply from 
the MapOutputTracker. 

So my questions:
 * Is there a separate log file for the MapOutputTracker where I can find more 
info?
 * Is the parallelism set too high? It seems to be fine for the rest of the job.
 * Is there anything else we can do? Is there maybe a way to stagger the task 
launches so they don't happen all at once?

This is not super critical, but I'd like to get rid of the errors. It happens 2 
or 3 times during a 10 hour job and retries always work correctly.


> Error communicating with MapOutputTracker when many tasks are launched 
> concurrently
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-28362
>                 URL: https://issues.apache.org/jira/browse/SPARK-28362
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.2
>         Environment: AWS EMR 5.24.0 with Yarn, Spark 2.4.2 and Beam 2.12.0
>            Reporter: Peter Backx
>            Priority: Major
>
> It looks like the scheduler is unknowingly creating a DoS attack on the 
> MapOutputTracker when many tasks are launched at the same time.
> We are running a Beam on Spark job on AWS EMR 5.24.0 (Yarn, Spark 2.4.2, Beam 
> 2.12.0)
> The job is running on 150 r4.4xlarge machines in cluster mode with executors 
> sized to take up the full machine. So we have 1 machine acting as driver and 
> 149 executors. Default parallelism is 149 * 13 (cores) * 20 = 38740
> When a new stage is launched, sometimes tasks will error out with the 
> message: "Error communicating with MapOutputTracker". 
> I've gone over the logs and it looks like the following is happening:
>  # When the final task of a previous stage completes, the driver launches new 
> tasks (driver log):
>  # 
>  ## 19/07/10 14:04:57 INFO DAGScheduler: Submitting 38740 missing tasks from 
> ShuffleMapStage 29 (MapPartitionsRDD[632] at mapToPair at 
> GroupCombineFunctions.java:147) (first 15 tasks are for partitions Vector(0, 
> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
>  # Executors use the MapOutputTracker to fetch the location of the data they 
> need to work on (executor log):
>  ## 19/07/10 14:04:57 INFO MapOutputTrackerWorker: Don't have map outputs for 
> shuffle 36, fetching them
>  19/07/10 14:04:57 INFO MapOutputTrackerWorker: Doing the fetch; tracker 
> endpoint = 
> NettyRpcEndpointRef([spark://[email protected]:42033])
>  # Usually all executors timeout after 2 minutes. In rare cases some of the 
> executors seem to receive a reply (executor log):
>  ## 19/07/10 14:06:57 ERROR MapOutputTrackerWorker: Error communicating with 
> MapOutputTracker
>  org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 
> seconds]. This timeout is controlled by spark.rpc.askTimeout
>  at 
> [org.apache.spark.rpc.RpcTimeout.org|http://org.apache.spark.rpc.rpctimeout.org/]$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
>  # Driver marks the task as failed and retries (driver log):
>  ## 19/07/10 14:06:57 WARN TaskSetManager: Lost task 1490.0 in stage 29.0 
> (TID 2724105, ip-172-28-94-245.eu-west-1.compute.internal, executor 248): 
> org.apache.spark.SparkException: Error communicating with MapOutputTracker
>  at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:270)
> I can't find any log with the reason why the executors don't get a reply from 
> the MapOutputTracker. 
> So my questions:
>  * Is there a separate log file for the MapOutputTracker where I can find 
> more info?
>  * Is the parallelism set too high? It seems to be fine for the rest of the 
> job.
>  * Is there anything else we can do? Is there maybe a way to stagger the task 
> launches so they don't happen all at once?
> This is not super critical, but I'd like to get rid of the errors. It happens 
> 2 or 3 times during a 10 hour job and retries always work correctly.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to