[ 
https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517439#comment-16517439
 ] 

Imran Rashid commented on SPARK-24578:
--------------------------------------

I think [~attilapiros] may be right -- can you send a PR to fix?

We were able to reproduce the issue with the code you gave, and noticed that 
the errors occur exactly at the two minute timeout.  The receiver closes the 
connection because of the timeout, so then the sender just has a generic 
failure that the connection was closed.

Sender:
{noformat}
18/06/18 10:12:08 ERROR server.TransportRequestHandler: Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=879251518000, 
chunkIndex=0}, 
buffer=org.apache.spark.storage.BlockManagerManagedBuffer@19b0a640} to 
/xxxxxx:38710; closing connection
java.io.IOException: Connection reset by peer
{noformat}

Receiver:
{noformat}
18/06/18 10:12:08 ERROR server.TransportChannelHandler: Connection to 
xxx.com/xxxx:38765 has been quiet for 120000 ms while there are outstanding 
requests. Assuming connection is dead; please adjust spark.network.timeout if 
this is wrong.
18/06/18 10:12:08 ERROR client.TransportResponseHandler: Still have 1 requests 
outstanding when connection from xxxxx.com/xxxxxxxx:38765 is closed
18/06/18 10:12:08 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 1 
outstanding blocks after 5000 ms
{noformat}

At first I found it hard to believe this slow down would lead to the connection 
appearing totally quiet -- but i realized that there is actually a lot of 
concurrent activity going on, as all the executors are also sending and 
receiving many more blocks from each other. So if you're simultaneously dealing 
with a bunch of large blocks, and we put this big slow down inside the netty 
threads, this could lead to things appearing idle.

[~wbzhao] does this match the cases you see as well?

> Reading remote cache block behavior changes and causes timeout issue
> --------------------------------------------------------------------
>
>                 Key: SPARK-24578
>                 URL: https://issues.apache.org/jira/browse/SPARK-24578
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.0, 2.3.1
>            Reporter: Wenbo Zhao
>            Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our 
> production job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003, 
> chunkIndex=0}, 
> buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to 
> /172.22.18.7:60865; closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at 
> io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at 
> io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at 
> io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 
> and host-2) each with 8 cores. Here, the memory of driver and executors are 
> not an import factor here as long as it is big enough, say 20G. 
> {code:java}
> val n = 100000000
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); 
> df.groupBy("x1").agg(count("value")).show() }
> {code}
>  
> In the above example, we generate a random DataFrame of size around 7G; cache 
> it and then perform a parallel DataFrame operations by using `array.par.map`. 
> Because of the parallel computation, with high possibility, some task could 
> be scheduled to a host-2 where it needs to read the cache block data from 
> host-1. This follows the code path of 
> [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691]
>  and then tries to transfer a big block (~ 500MB) of cache block from host-1 
> to host-2. Often, this big transfer makes the cluster suffer time out issue 
> (it will retry 3 times, each with 120s timeout, and then do recompute to put 
> the cache block into the local MemoryStore).
> We couldn't to reproduce the same issue in Spark 2.2.1. From the log of Spark 
> 2.2.1, we found that 
> {code:java}
> 18/06/16 17:23:47 DEBUG BlockManager: Getting local block rdd_3_0 
> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock 
> for rdd_3_0 
> 18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 was not found 
> 18/06/16 17:23:47 DEBUG BlockManager: Getting remote block rdd_3_0 
> 18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 not found 
> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to put rdd_3_0 
> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock 
> for rdd_3_0 
> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire write lock 
> for rdd_3_0 
> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 acquired write lock for 
> rdd_3_0 
> 18/06/16 17:23:58 INFO MemoryStore: Block rdd_3_0 stored as values in memory 
> (estimated size 538.2 MB, free 11.1 GB)
> {code}
> That is, when a task is scheduled to a host-2 where it needs to read the 
> cache block rdd_3_0 data from host-1, the endpoint of 
> `master.getLocations(..)` ( see 
> [https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L622])
>  reports a remote cache block is not found and triggered the recompute.  
> -I believe this behavior change is introduced by this change set  
> [https://github.com/apache/spark/commit/e1960c3d6f380b0dfbba6ee5d8ac6da4bc29a698#diff-2b643ea78c1add0381754b1f47eec132]-
>  
> We have two questions here
>  # what is the right behavior, should we re-compute or should we transfer 
> block from remote?
>  # if we should transfer from remote, why the performance is so bad for cache 
> block?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to