[ 
https://issues.apache.org/jira/browse/SPARK-17555?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-17555.
-------------------------------
    Resolution: Not A Problem

> ExternalShuffleBlockResolver fails randomly with External Shuffle Service  
> and Dynamic Resource Allocation on Mesos running under Marathon
> ------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-17555
>                 URL: https://issues.apache.org/jira/browse/SPARK-17555
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.2, 1.6.1, 1.6.2, 2.0.0
>         Environment: Mesos using docker with external shuffle service running 
> on Marathon. Running code from pyspark shell in client mode.
>            Reporter: Brad Willard
>              Labels: docker, dynamic_allocation, mesos, shuffle
>
> External Shuffle Service throws these errors about 90% of the time. It seems 
> to die between stages and work inconsistently with these style of errors 
> about missing files. I've tested this same behavior with all the spark 
> versions listed on the jira using the pre-build hadoop 2.6 distributions from 
> the apache spark download page. I also want to mention everything works 
> successfully with dynamic resource allocation turned off.
> I have read other related bugs and have tried some of the 
> workaround/suggestions. Seems like some people have blamed the switch from 
> akka to netty which got me testing this in the 1.5* range with no luck. I'm 
> currently running these config option (informed by reading other bugs on jira 
> that seemed related to my problem). These settings have helped it work 
> sometimes instead of never.
> {code}
> spark.shuffle.service.port                7338
> spark.shuffle.io.numConnectionsPerPeer  4
> spark.shuffle.io.connectionTimeout      18000s
> spark.shuffle.service.enabled           true
> spark.dynamicAllocation.enabled         true
> {code}
> on the driver for pyspark submit I'm sending along this config
> {code}
>     --conf 
> spark.mesos.executor.docker.image=docker-registry.xxxxxxxxxxxxx.net/machine-learning/spark:spark-1-6-2v1
>  \
>     --conf spark.shuffle.service.enabled=true \
>     --conf spark.dynamicAllocation.enabled=true \
>     --conf spark.mesos.coarse=true \
>     --conf spark.cores.max=100 \
>     --conf spark.executor.uri=$SPARK_EXECUTOR_URI \
>     --conf spark.shuffle.service.port=7338 \
>     --executor-memory 15g
> {code}
> Under Marathon I'm pinning each external shuffle service to an agent and 
> starting the service like this.
> {code}
> $SPARK_HOME/sbin/start-mesos-shuffle-service.sh && tail -f 
> $SPARK_HOME/logs/spark--org.apache.spark.deploy.mesos.MesosExternalShuffleService*
> {code}
> On startup it seems like all is well
> {code}
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
>       /_/
> Using Python version 3.5.2 (default, Jul  2 2016 17:52:12)
> SparkContext available as sc, HiveContext available as sqlContext.
> >>> 16/09/15 11:35:53 INFO CoarseMesosSchedulerBackend: Mesos task 1 is now 
> >>> TASK_RUNNING
> 16/09/15 11:35:53 INFO MesosExternalShuffleClient: Successfully registered 
> app a7a50f09-0ce0-4417-91a9-fa694416e903-0091 with external shuffle service.
> 16/09/15 11:35:55 INFO CoarseMesosSchedulerBackend: Mesos task 0 is now 
> TASK_RUNNING
> 16/09/15 11:35:55 INFO MesosExternalShuffleClient: Successfully registered 
> app a7a50f09-0ce0-4417-91a9-fa694416e903-0091 with external shuffle service.
> 16/09/15 11:35:56 INFO CoarseMesosSchedulerBackend: Registered executor 
> NettyRpcEndpointRef(null) (mesos-agent002.[redacted]:61281) with ID 
> 55300bb1-aca1-4dd1-8647-ce4a50f6d661-S6/1
> 16/09/15 11:35:56 INFO ExecutorAllocationManager: New executor 
> 55300bb1-aca1-4dd1-8647-ce4a50f6d661-S6/1 has registered (new total is 1)
> 16/09/15 11:35:56 INFO BlockManagerMasterEndpoint: Registering block manager 
> mesos-agent002.[redacted]:46247 with 10.6 GB RAM, 
> BlockManagerId(55300bb1-aca1-4dd1-8647-ce4a50f6d661-S6/1, 
> mesos-agent002.[redacted], 46247)
> 16/09/15 11:35:59 INFO CoarseMesosSchedulerBackend: Registered executor 
> NettyRpcEndpointRef(null) (mesos-agent004.[redacted]:42738) with ID 
> 55300bb1-aca1-4dd1-8647-ce4a50f6d661-S10/0
> 16/09/15 11:35:59 INFO ExecutorAllocationManager: New executor 
> 55300bb1-aca1-4dd1-8647-ce4a50f6d661-S10/0 has registered (new total is 2)
> 16/09/15 11:35:59 INFO BlockManagerMasterEndpoint: Registering block manager 
> mesos-agent004.[redacted]:11262 with 10.6 GB RAM, 
> BlockManagerId(55300bb1-aca1-4dd1-8647-ce4a50f6d661-S10/0, 
> mesos-agent004.[redacted], 11262)
> {code}
> I'm running a simple sort command on a spark data frame loaded from hdfs from 
> a parquet file. These are the errors. They happen shortly after startup as 
> agents are being allocated by mesos. 
> {code}
> 16/09/15 14:49:10 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) 
> from mesos-agent004:7338
> java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: 
> /tmp/blockmgr-29f93bea-9a87-41de-9046-535401e6d4fd/30/shuffle_0_0_0.index
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:286)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:190)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:85)
>       at 
> org.apache.spark.deploy.mesos.MesosExternalShuffleBlockHandler.handleMessage(MesosExternalShuffleService.scala:61)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:72)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>       at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /tmp/blockmgr-29f93bea-9a87-41de-9046-535401e6d4fd/30/shuffle_0_0_0.index (No 
> such file or directory)
>       at java.io.FileInputStream.open(Native Method)
>       at java.io.FileInputStream.<init>(FileInputStream.java:146)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:275)
>       ... 28 more
>       at 
> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:186)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:106)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>       at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>       at java.lang.Thread.run(Thread.java:745)
> 16/09/15 14:49:10 ERROR RetryingBlockFetcher: Failed to fetch block 
> shuffle_0_54_57, and will not retry (0 retries)
> java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: 
> /tmp/blockmgr-29f93bea-9a87-41de-9046-535401e6d4fd/30/shuffle_0_0_0.index
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:286)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:190)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:85)
>       at 
> org.apache.spark.deploy.mesos.MesosExternalShuffleBlockHandler.handleMessage(MesosExternalShuffleService.scala:61)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:72)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>       at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to