DENG FEI created SPARK-25070:
--------------------------------
Summary: BlockFetchingListener#onBlockFetchSuccess throw
"java.util.NoSuchElementException: key not found: shuffle_8_68_113" on
ShuffleBlockFetcherIterator caused stage hang long time
Key: SPARK-25070
URL: https://issues.apache.org/jira/browse/SPARK-25070
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 2.2.0
Reporter: DENG FEI
The task fetch shuffle block success, but failed onBlockFetchSuccess, lead the
task hang long time and speculate as false.
The log is below:
18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches
in 16 ms 18/08/08 14:55:53 WARN TransportChannelHandler: Exception in
connection from /xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not
found: shuffle_8_68_113 at
scala.collection.MapLike$class.default(MapLike.scala:228) at
scala.collection.AbstractMap.default(Map.scala:59) at
scala.collection.MapLike$class.apply(MapLike.scala:141) at
scala.collection.AbstractMap.apply(Map.scala:59) at
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204)
at
org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97)
at
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171)
at
org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399) at
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371) at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745) 18/08/08 14:55:53 INFO Executor:
Finished task 44.0 in stage 14.0 (TID 1483). 3458 bytes result sent to driver
18/08/09 10:02:32 INFO Executor: Executor is trying to kill task 113.0 in stage
14.0 (TID 1552), reason: stage cancelled
{code:java}
val blockFetchingListener = new BlockFetchingListener {
override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit =
{
// Only add the buffer to results queue if the iterator is not zombie,
// i.e. cleanup() has not been called yet.
ShuffleBlockFetcherIterator.this.synchronized {
try {
if (!isZombie) {
// Increment the ref count because we need to pass this to a
different thread.
// This needs to be released after use.
buf.retain()
remainingBlocks -= blockId
results.put(new SuccessFetchResult(BlockId(blockId), address,
sizeMap(blockId), buf,
remainingBlocks.isEmpty))
logDebug("remainingBlocks: " + remainingBlocks)
}
} catch {
case e : Throwable => onBlockFetchFailure(blockId, e)
}
}
logTrace("Got remote block " + blockId + " after " +
Utils.getUsedTimeMs(startTime))
}
override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
logError(s"Failed to get block(s) from
${req.address.host}:${req.address.port}", e)
results.put(new FailureFetchResult(BlockId(blockId), address, e))
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]