[
https://issues.apache.org/jira/browse/SPARK-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14573396#comment-14573396
]
Imran Rashid commented on SPARK-7703:
-------------------------------------
Hi [~wenhailong1988],
Thanks for reporting this, and the thorough analysis. I would need to study
this code a little more carefully to be sure, what you are saying very
reasonable. Since you have already done a lot of the work to find a fix, would
you like submit a pull request with the fix yourself? You can see more details
on contributing here:
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark
We generally do the more detailed code review on the github pull requests, but
some high level comments:
* do you think this might be related to some other issues reported w/
TorrentBroadcast, in particular SPARK-5812 and SPARK-5594 ? It appears to be
different, but you might have more insight.
* can you create a small reproduction which demonstrates the issue? It might
be a fake job which manually kills some executors or something? This will be
useful for reviewers and also to help prevent future regressions. It might be
tough creating a test like this, there might not be the hooks you need. I've
recently been writing some tests that are somewhat like this, I might be able
to help out some if you get stuck (or hopefully at least figure out what hooks
we need to add).
> Task failure caused by block fetch failure in BlockManager.doGetRemote() when
> using TorrentBroadcast
> ----------------------------------------------------------------------------------------------------
>
> Key: SPARK-7703
> URL: https://issues.apache.org/jira/browse/SPARK-7703
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.2.1, 1.3.1
> Environment: Red Hat Enterprise Linux Server release 7.0 (Maipo)
> Spark 1.3.1 Release
> Reporter: Hailong Wen
>
> I am from IBM Platform Symphony team and we are working to integration Spark
> with our EGO to provide a fine-grained dynamic allocation Resource Manager.
> We found a defect in current implementation of BlockManager.doGetRemote():
> {noformat}
> private def doGetRemote(blockId: BlockId, asBlockResult: Boolean):
> Option[Any] = {
> require(blockId != null, "BlockId is null")
> val locations = Random.shuffle(master.getLocations(blockId))
> <--------------- Issue2: locations may be out of date
> for (loc <- locations) {
> logDebug(s"Getting remote block $blockId from $loc")
> val data = blockTransferService.fetchBlockSync(
> loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
> <--------------- Issue1: This statement is not in try/catch
> if (data != null) {
> if (asBlockResult) {
> return Some(new BlockResult(
> dataDeserialize(blockId, data),
> DataReadMethod.Network,
> data.limit()))
> } else {
> return Some(data)
> }
> }
> logDebug(s"The value of block $blockId is null")
> }
> logDebug(s"Block $blockId not found")
> None
> }
> {noformat}
> * Issue 1: Although the block fetch uses "for" to try all available
> locations, the fetch method is not guarded by a "Try" block. When exception
> occurs, this method will directly throw the error instead of trying other
> block locations. The uncaught exception will cause task failure.
> * Issue 2: Constant "location" is acquired before fetching, however in a
> dynamic allocation environment the block locations may change.
> We hit the above 2 issues in our use case, where Executors exit after all its
> assigned tasks are done. We *occasionally* get the following error (issue 1.):
> {noformat}
> 15/05/13 10:28:35 INFO Executor: Running task 27.0 in stage 0.0 (TID 27)
> 15/05/13 10:28:35 DEBUG Executor: Task 26's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 28's epoch is 0
> 15/05/13 10:28:35 DEBUG Executor: Task 27's epoch is 0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0 not registered locally
> 15/05/13 10:28:35 INFO TorrentBroadcast: Started reading broadcast variable 0
> 15/05/13 10:28:35 DEBUG TorrentBroadcast: Reading piece broadcast_0_piece0 of
> broadcast_0
> 15/05/13 10:28:35 DEBUG BlockManager: Getting local block broadcast_0_piece0
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Block broadcast_0_piece0 not registered
> locally
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0
> as bytes
> 15/05/13 10:28:35 DEBUG BlockManager: Getting remote block broadcast_0_piece0
> from BlockManagerId(c390c311-bd97-4a99-bcb9-b32fd3dede17, sparkbj01, 37599)
> 15/05/13 10:28:35 TRACE NettyBlockTransferService: Fetch blocks from
> sparkbj01:37599 (executor id c390c311-bd97-4a99-bcb9-b32fd3dede17)
> 15/05/13 10:28:35 DEBUG TransportClientFactory: Creating new connection to
> sparkbj01/9.111.254.195:37599
> 15/05/13 10:28:35 ERROR RetryingBlockFetcher: Exception while beginning fetch
> of 1 outstanding blocks
> java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
> at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
> at
> org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:89)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:599)
> at
> org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:597)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:597)
> at
> org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:591)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.org$apache$spark$broadcast$TorrentBroadcast$$anonfun$$getRemote$1(TorrentBroadcast.scala:126)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$1.apply(TorrentBroadcast.scala:136)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:136)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:174)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1149)
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: java.net.ConnectException: Connection refused:
> sparkbj01/9.111.254.195:37599
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> ... 1 more
> 15/05/13 10:28:35 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 1
> outstanding blocks after 5000 ms
> 15/05/13 10:28:40 DEBUG TransportClientFactory: Creating new connection to
> sparkbj01/9.111.254.195:37599
> 15/05/13 10:28:40 ERROR RetryingBlockFetcher: Exception while beginning fetch
> of 1 outstanding blocks (after 1 retries)
> java.io.IOException: Failed to connect to sparkbj01/9.111.254.195:37599
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: java.net.ConnectException: Connection refused:
> sparkbj01/9.111.254.195:37599
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> ... 1 more
> {noformat}
> We did send "ExecutorLost" messages so that BlockManagerMaster can remove the
> executor from its block location map. But due to network latency the
> "getLocation" call may happen before the removal.
> In our heavy workload environment, some tasks may keep fail and finally
> causes *job failure*.
> Using HttpBroadcast instead of default TorrentBroadcast did help to resolve
> this problem but we want better performance. So we added a Try block but
> found that the "for" loop will try dozens of dead executor before finally
> fetched the block from driver's BlockManager. This process takes *several
> minutes*.
> We are now working around this problem by the following fix:
> {noformat}
> private def doGetRemote(blockId: BlockId, asBlockResult: Boolean):
> Option[Any] = {
> require(blockId != null, "BlockId is null")
> var blockFetched = false
> while (!blockFetched) {
> val locations = Random.shuffle(master.getLocations(blockId))
> val loc = locations.head
> logDebug(s"Getting remote block $blockId from $loc")
> val dataTry = Try(blockTransferService.fetchBlockSync(
> loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer())
>
> dataTry match {
> case Success(data) =>
> if (data != null) {
> if (asBlockResult) {
> return Some(new BlockResult(
> dataDeserialize(blockId, data),
> DataReadMethod.Network,
> data.limit()))
> } else {
> return Some(data)
> }
> }
> logDebug(s"The value of block $blockId is null")
> case Failure(e) =>
> logWarning(s"Failed to fetch block ${blockId.toString} from
> ${loc.host}:"
> + s"${loc.port} executorId:${loc.executorId}. "
> + {
> if (locations.size <= 1) "" else "Will update
> locations and retry."
> })
> }
> // If we have no more than 1 location to get from (the driver), we may
> stop retrying and just exit.
> blockFetched = (locations.size <= 1)
> }
> logDebug(s"Block $blockId not found")
> None
> }
> {noformat}
> This fix suppress the Exception when fetch fails, and update the location to
> reduce future failures.
> We are expecting to get help from experts in the community to have a more
> thorough solution (e.g., can we try all available block locations in a random
> rolling manner, instead of re-trying the same location 4 times consecutively?)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]