[ 
https://issues.apache.org/jira/browse/SPARK-25070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DENG FEI updated SPARK-25070:
-----------------------------
    Description: 
The task fetch shuffle block success, but failed onBlockFetchSuccess, lead the 
task hang long time and speculate as false.

The log is below:

18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches 
in 16 ms

18/08/08 14:55:53 WARN TransportChannelHandler: Exception in connection from 
/xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not found: 
shuffle_8_68_113 at scala.collection.MapLike$class.default(MapLike.scala:228) 
at scala.collection.AbstractMap.default(Map.scala:59) at 
scala.collection.MapLike$class.apply(MapLike.scala:141) at 
scala.collection.AbstractMap.apply(Map.scala:59) at 
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217)
 at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204)
 at 
org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97)
 at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171)
 at

XXXXXX

18/08/08 14:55:53 INFO Executor: Finished task 44.0 in stage 14.0 (TID 1483). 
3458 bytes result sent to driver 18/08/09 10:02:32 INFO Executor: Executor is 
trying to kill task 113.0 in stage 14.0 (TID 1552), reason: stage cancelled
{code:java}
val blockFetchingListener = new BlockFetchingListener {
  override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = 
{
    // Only add the buffer to results queue if the iterator is not zombie,
    // i.e. cleanup() has not been called yet.
    ShuffleBlockFetcherIterator.this.synchronized {
      try {
        if (!isZombie) {
          // Increment the ref count because we need to pass this to a 
different thread.
          // This needs to be released after use.
          buf.retain()
          remainingBlocks -= blockId
          results.put(new SuccessFetchResult(BlockId(blockId), address, 
sizeMap(blockId), buf,
            remainingBlocks.isEmpty))
          logDebug("remainingBlocks: " + remainingBlocks)
        }
      } catch {
        case e : Throwable => onBlockFetchFailure(blockId, e)
      }
    }
    logTrace("Got remote block " + blockId + " after " + 
Utils.getUsedTimeMs(startTime))
  }

  override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
    logError(s"Failed to get block(s) from 
${req.address.host}:${req.address.port}", e)
    results.put(new FailureFetchResult(BlockId(blockId), address, e))
  }
}
{code}
 

 
{code:java}
results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), 
buf,
            remainingBlocks.isEmpty)){code}
  

 

  was:
The task fetch shuffle block success, but failed onBlockFetchSuccess, lead the 
task hang long time and speculate as false.

The log is below:

18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches 
in 16 ms 18/08/08 14:55:53 WARN TransportChannelHandler: Exception in 
connection from /xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not 
found: shuffle_8_68_113 at 
scala.collection.MapLike$class.default(MapLike.scala:228) at 
scala.collection.AbstractMap.default(Map.scala:59) at 
scala.collection.MapLike$class.apply(MapLike.scala:141) at 
scala.collection.AbstractMap.apply(Map.scala:59) at 
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217)
 at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204)
 at 
org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97)
 at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171)
 at 
org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
 at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) 
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
 at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399) at 
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371) at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
 at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
 at java.lang.Thread.run(Thread.java:745) 18/08/08 14:55:53 INFO Executor: 
Finished task 44.0 in stage 14.0 (TID 1483). 3458 bytes result sent to driver 
18/08/09 10:02:32 INFO Executor: Executor is trying to kill task 113.0 in stage 
14.0 (TID 1552), reason: stage cancelled
{code:java}
val blockFetchingListener = new BlockFetchingListener {
  override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = 
{
    // Only add the buffer to results queue if the iterator is not zombie,
    // i.e. cleanup() has not been called yet.
    ShuffleBlockFetcherIterator.this.synchronized {
      try {
        if (!isZombie) {
          // Increment the ref count because we need to pass this to a 
different thread.
          // This needs to be released after use.
          buf.retain()
          remainingBlocks -= blockId
          results.put(new SuccessFetchResult(BlockId(blockId), address, 
sizeMap(blockId), buf,
            remainingBlocks.isEmpty))
          logDebug("remainingBlocks: " + remainingBlocks)
        }
      } catch {
        case e : Throwable => onBlockFetchFailure(blockId, e)
      }
    }
    logTrace("Got remote block " + blockId + " after " + 
Utils.getUsedTimeMs(startTime))
  }

  override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
    logError(s"Failed to get block(s) from 
${req.address.host}:${req.address.port}", e)
    results.put(new FailureFetchResult(BlockId(blockId), address, e))
  }
}
{code}
 

 

 


> BlockFetchingListener#onBlockFetchSuccess throw 
> "java.util.NoSuchElementException: key not found: shuffle_8_68_113" on  
> ShuffleBlockFetcherIterator caused stage hang long time
> -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-25070
>                 URL: https://issues.apache.org/jira/browse/SPARK-25070
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.2.0
>            Reporter: DENG FEI
>            Priority: Major
>
> The task fetch shuffle block success, but failed onBlockFetchSuccess, lead 
> the task hang long time and speculate as false.
> The log is below:
> 18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches 
> in 16 ms
> 18/08/08 14:55:53 WARN TransportChannelHandler: Exception in connection from 
> /xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not found: 
> shuffle_8_68_113 at scala.collection.MapLike$class.default(MapLike.scala:228) 
> at scala.collection.AbstractMap.default(Map.scala:59) at 
> scala.collection.MapLike$class.apply(MapLike.scala:141) at 
> scala.collection.AbstractMap.apply(Map.scala:59) at 
> org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217)
>  at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204)
>  at 
> org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97)
>  at 
> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171)
>  at
> XXXXXX
> 18/08/08 14:55:53 INFO Executor: Finished task 44.0 in stage 14.0 (TID 1483). 
> 3458 bytes result sent to driver 18/08/09 10:02:32 INFO Executor: Executor is 
> trying to kill task 113.0 in stage 14.0 (TID 1552), reason: stage cancelled
> {code:java}
> val blockFetchingListener = new BlockFetchingListener {
>   override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit 
> = {
>     // Only add the buffer to results queue if the iterator is not zombie,
>     // i.e. cleanup() has not been called yet.
>     ShuffleBlockFetcherIterator.this.synchronized {
>       try {
>         if (!isZombie) {
>           // Increment the ref count because we need to pass this to a 
> different thread.
>           // This needs to be released after use.
>           buf.retain()
>           remainingBlocks -= blockId
>           results.put(new SuccessFetchResult(BlockId(blockId), address, 
> sizeMap(blockId), buf,
>             remainingBlocks.isEmpty))
>           logDebug("remainingBlocks: " + remainingBlocks)
>         }
>       } catch {
>         case e : Throwable => onBlockFetchFailure(blockId, e)
>       }
>     }
>     logTrace("Got remote block " + blockId + " after " + 
> Utils.getUsedTimeMs(startTime))
>   }
>   override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
>     logError(s"Failed to get block(s) from 
> ${req.address.host}:${req.address.port}", e)
>     results.put(new FailureFetchResult(BlockId(blockId), address, e))
>   }
> }
> {code}
>  
>  
> {code:java}
> results.put(new SuccessFetchResult(BlockId(blockId), address, 
> sizeMap(blockId), buf,
>             remainingBlocks.isEmpty)){code}
>   
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to