[ 
https://issues.apache.org/jira/browse/SPARK-17555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15573363#comment-15573363
 ] 

Eugene Zhulenev edited comment on SPARK-17555 at 10/13/16 10:21 PM:
--------------------------------------------------------------------

[~brdwrd]

I had the same issue, and I figured out why. Basically you Spark executor is 
running in Docker container and writes shuffle blocks to /tmp directory, and 
passes this information to External Shuffle Service. But the problem is that 
shuffle service running in it's own Docker container. You have to share the 
save volume between your executors and shuffle service.

I added this config to shuffle service marathon configuration:

{code}
"container": {
    "type": "DOCKER",
    "docker": {
      "image": "mesosphere/spark:1.0.2-2.0.0",
      "network": "HOST"
    },
    "volumes": [
      {
        "containerPath": "/var/data/spark",
        "hostPath": "/var/data/spark",
        "mode": "RW"
      }
    ]
  }
{code}

After that you need to specify *spark.mesos.executor.docker.volumes* and 
*spark.local.dir* parameters, so executors would write data to shared volume


was (Author: ezhulenev):
[~brdwrd]

I had the same issue, and I figured out why. Basically you Spark executor is 
running in Docker container and writes shuffle blocks to /tmp directory, and 
passes this information to External Shuffle Service. But the problem is that 
shuffle service running in it's own Docker container. You have to share the 
save volume between your executors and shuffle service.

I added this config to shuffle service marathon configuration:

{code}
"container": {
    "type": "DOCKER",
    "docker": {
      "image": "mesosphere/spark:1.0.2-2.0.0",
      "network": "HOST"
    },
    "volumes": [
      {
        "containerPath": "/var/data/spark",
        "hostPath": "/var/data/spark",
        "mode": "RW"
      }
    ]
  }
{code}

After that you need to specify spark.mesos.executor.docker.volumes and 
spark.local.dir parameters, so executors would write data to shared volume

> ExternalShuffleBlockResolver fails randomly with External Shuffle Service  
> and Dynamic Resource Allocation on Mesos running under Marathon
> ------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-17555
>                 URL: https://issues.apache.org/jira/browse/SPARK-17555
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.5.2, 1.6.1, 1.6.2, 2.0.0
>         Environment: Mesos using docker with external shuffle service running 
> on Marathon. Running code from pyspark shell in client mode.
>            Reporter: Brad Willard
>              Labels: docker, dynamic_allocation, mesos, shuffle
>
> External Shuffle Service throws these errors about 90% of the time. It seems 
> to die between stages and work inconsistently with these style of errors 
> about missing files. I've tested this same behavior with all the spark 
> versions listed on the jira using the pre-build hadoop 2.6 distributions from 
> the apache spark download page. I also want to mention everything works 
> successfully with dynamic resource allocation turned off.
> I have read other related bugs and have tried some of the 
> workaround/suggestions. Seems like some people have blamed the switch from 
> akka to netty which got me testing this in the 1.5* range with no luck. I'm 
> currently running these config option (informed by reading other bugs on jira 
> that seemed related to my problem). These settings have helped it work 
> sometimes instead of never.
> {code}
> spark.shuffle.service.port                7338
> spark.shuffle.io.numConnectionsPerPeer  4
> spark.shuffle.io.connectionTimeout      18000s
> spark.shuffle.service.enabled           true
> spark.dynamicAllocation.enabled         true
> {code}
> on the driver for pyspark submit I'm sending along this config
> {code}
>     --conf 
> spark.mesos.executor.docker.image=docker-registry.xxxxxxxxxxxxx.net/machine-learning/spark:spark-1-6-2v1
>  \
>     --conf spark.shuffle.service.enabled=true \
>     --conf spark.dynamicAllocation.enabled=true \
>     --conf spark.mesos.coarse=true \
>     --conf spark.cores.max=100 \
>     --conf spark.executor.uri=$SPARK_EXECUTOR_URI \
>     --conf spark.shuffle.service.port=7338 \
>     --executor-memory 15g
> {code}
> Under Marathon I'm pinning each external shuffle service to an agent and 
> starting the service like this.
> {code}
> $SPARK_HOME/sbin/start-mesos-shuffle-service.sh && tail -f 
> $SPARK_HOME/logs/spark--org.apache.spark.deploy.mesos.MesosExternalShuffleService*
> {code}
> On startup it seems like all is well
> {code}
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
>       /_/
> Using Python version 3.5.2 (default, Jul  2 2016 17:52:12)
> SparkContext available as sc, HiveContext available as sqlContext.
> >>> 16/09/15 11:35:53 INFO CoarseMesosSchedulerBackend: Mesos task 1 is now 
> >>> TASK_RUNNING
> 16/09/15 11:35:53 INFO MesosExternalShuffleClient: Successfully registered 
> app a7a50f09-0ce0-4417-91a9-fa694416e903-0091 with external shuffle service.
> 16/09/15 11:35:55 INFO CoarseMesosSchedulerBackend: Mesos task 0 is now 
> TASK_RUNNING
> 16/09/15 11:35:55 INFO MesosExternalShuffleClient: Successfully registered 
> app a7a50f09-0ce0-4417-91a9-fa694416e903-0091 with external shuffle service.
> 16/09/15 11:35:56 INFO CoarseMesosSchedulerBackend: Registered executor 
> NettyRpcEndpointRef(null) (mesos-agent002.[redacted]:61281) with ID 
> 55300bb1-aca1-4dd1-8647-ce4a50f6d661-S6/1
> 16/09/15 11:35:56 INFO ExecutorAllocationManager: New executor 
> 55300bb1-aca1-4dd1-8647-ce4a50f6d661-S6/1 has registered (new total is 1)
> 16/09/15 11:35:56 INFO BlockManagerMasterEndpoint: Registering block manager 
> mesos-agent002.[redacted]:46247 with 10.6 GB RAM, 
> BlockManagerId(55300bb1-aca1-4dd1-8647-ce4a50f6d661-S6/1, 
> mesos-agent002.[redacted], 46247)
> 16/09/15 11:35:59 INFO CoarseMesosSchedulerBackend: Registered executor 
> NettyRpcEndpointRef(null) (mesos-agent004.[redacted]:42738) with ID 
> 55300bb1-aca1-4dd1-8647-ce4a50f6d661-S10/0
> 16/09/15 11:35:59 INFO ExecutorAllocationManager: New executor 
> 55300bb1-aca1-4dd1-8647-ce4a50f6d661-S10/0 has registered (new total is 2)
> 16/09/15 11:35:59 INFO BlockManagerMasterEndpoint: Registering block manager 
> mesos-agent004.[redacted]:11262 with 10.6 GB RAM, 
> BlockManagerId(55300bb1-aca1-4dd1-8647-ce4a50f6d661-S10/0, 
> mesos-agent004.[redacted], 11262)
> {code}
> I'm running a simple sort command on a spark data frame loaded from hdfs from 
> a parquet file. These are the errors. They happen shortly after startup as 
> agents are being allocated by mesos. 
> {code}
> 16/09/15 14:49:10 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) 
> from mesos-agent004:7338
> java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: 
> /tmp/blockmgr-29f93bea-9a87-41de-9046-535401e6d4fd/30/shuffle_0_0_0.index
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:286)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:190)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:85)
>       at 
> org.apache.spark.deploy.mesos.MesosExternalShuffleBlockHandler.handleMessage(MesosExternalShuffleService.scala:61)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:72)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>       at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /tmp/blockmgr-29f93bea-9a87-41de-9046-535401e6d4fd/30/shuffle_0_0_0.index (No 
> such file or directory)
>       at java.io.FileInputStream.open(Native Method)
>       at java.io.FileInputStream.<init>(FileInputStream.java:146)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:275)
>       ... 28 more
>       at 
> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:186)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:106)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>       at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>       at java.lang.Thread.run(Thread.java:745)
> 16/09/15 14:49:10 ERROR RetryingBlockFetcher: Failed to fetch block 
> shuffle_0_54_57, and will not retry (0 retries)
> java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: 
> /tmp/blockmgr-29f93bea-9a87-41de-9046-535401e6d4fd/30/shuffle_0_0_0.index
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:286)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:190)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.handleMessage(ExternalShuffleBlockHandler.java:85)
>       at 
> org.apache.spark.deploy.mesos.MesosExternalShuffleBlockHandler.handleMessage(MesosExternalShuffleService.scala:61)
>       at 
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:72)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:149)
>       at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:102)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:104)
>       at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
>       at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:86)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to