[
https://issues.apache.org/jira/browse/SPARK-40168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhongwei Zhu updated SPARK-40168:
---------------------------------
Description:
When shuffle files not found, decommissioner will handles IOException, but the
real exception is as below:
{code:java}
22/08/10 18:05:34 ERROR BlockManagerDecommissioner: Error occurred during
migrating migrate_shuffle_1_356
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at
org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:122)
at
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$4(BlockManagerDecommissioner.scala:120)
at
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$4$adapted(BlockManagerDecommissioner.scala:111)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:111)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Failed to send RPC RPC 5697756267528635203 to
/10.240.2.65:43481: java.io.FileNotFoundException:
/tmp/blockmgr-98a2a29a-5231-4fed-a82e-6bc0531ad407/15/shuffle_1_356_0.index (No
such file or directory)
at
org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:392)
at
org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
at
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at
io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at
io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at
io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at
io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at
io.netty.util.internal.PromiseNotificationUtil.tryFailure(PromiseNotificationUtil.java:64)
at
io.netty.channel.ChannelOutboundBuffer.safeFail(ChannelOutboundBuffer.java:723)
at
io.netty.channel.ChannelOutboundBuffer.remove0(ChannelOutboundBuffer.java:308)
at
io.netty.channel.ChannelOutboundBuffer.failFlushed(ChannelOutboundBuffer.java:660)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:735)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.handleWriteError(AbstractChannel.java:950)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:933)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
at
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
at
io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Caused by: java.io.FileNotFoundException:
/tmp/blockmgr-98a2a29a-5231-4fed-a82e-6bc0531ad407/15/shuffle_1_356_0.index (No
such file or directory)
at java.base/java.io.RandomAccessFile.open0(Native Method)
at java.base/java.io.RandomAccessFile.open(RandomAccessFile.java:345)
at java.base/java.io.RandomAccessFile.<init>(RandomAccessFile.java:259)
at java.base/java.io.RandomAccessFile.<init>(RandomAccessFile.java:214)
at io.netty.channel.DefaultFileRegion.open(DefaultFileRegion.java:88)
at io.netty.channel.DefaultFileRegion.transferTo(DefaultFileRegion.java:128)
at
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121)
at
org.apache.spark.network.crypto.TransportCipher$EncryptedMessage.encryptMore(TransportCipher.java:347)
at
org.apache.spark.network.crypto.TransportCipher$EncryptedMessage.transferTo(TransportCipher.java:310)
at
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
at
io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:238)
at
io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:212)
at
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
... 17 more
22/08/10 18:05:34 WARN BlockManagerDecommissioner: Stop migrating shuffle
blocks to BlockManagerId(0, 10.240.2.65, 43481, None)
{code}
This wrapped exception should be handled explicitly, further avoid unnecessary
retry of this shuffle block and stop of current migration thread
was:
{code:java}
// Some comments here
public String getFoo()
{
return foo;
}
{code}
{code:java}
// code placeholder
{code}
When shuffle files not found, decommissioner will handles IOException, but the
real exception is as below:
```
22/08/10 18:05:34 ERROR BlockManagerDecommissioner: Error occurred during
migrating migrate_shuffle_1_356
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
at
org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:122)
at
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$4(BlockManagerDecommissioner.scala:120)
at
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$4$adapted(BlockManagerDecommissioner.scala:111)
at scala.collection.immutable.List.foreach(List.scala:431)
at
org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:111)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Failed to send RPC RPC 5697756267528635203 to
/10.240.2.65:43481: java.io.FileNotFoundException:
/tmp/blockmgr-98a2a29a-5231-4fed-a82e-6bc0531ad407/15/shuffle_1_356_0.index (No
such file or directory)
at
org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:392)
at
org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
at
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at
io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at
io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at
io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at
io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at
io.netty.util.internal.PromiseNotificationUtil.tryFailure(PromiseNotificationUtil.java:64)
at
io.netty.channel.ChannelOutboundBuffer.safeFail(ChannelOutboundBuffer.java:723)
at
io.netty.channel.ChannelOutboundBuffer.remove0(ChannelOutboundBuffer.java:308)
at
io.netty.channel.ChannelOutboundBuffer.failFlushed(ChannelOutboundBuffer.java:660)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:735)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.handleWriteError(AbstractChannel.java:950)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:933)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
at
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
at
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
at
io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
at
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
at
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
at
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Caused by: java.io.FileNotFoundException:
/tmp/blockmgr-98a2a29a-5231-4fed-a82e-6bc0531ad407/15/shuffle_1_356_0.index (No
such file or directory)
at java.base/java.io.RandomAccessFile.open0(Native Method)
at java.base/java.io.RandomAccessFile.open(RandomAccessFile.java:345)
at java.base/java.io.RandomAccessFile.<init>(RandomAccessFile.java:259)
at java.base/java.io.RandomAccessFile.<init>(RandomAccessFile.java:214)
at io.netty.channel.DefaultFileRegion.open(DefaultFileRegion.java:88)
at io.netty.channel.DefaultFileRegion.transferTo(DefaultFileRegion.java:128)
at
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121)
at
org.apache.spark.network.crypto.TransportCipher$EncryptedMessage.encryptMore(TransportCipher.java:347)
at
org.apache.spark.network.crypto.TransportCipher$EncryptedMessage.transferTo(TransportCipher.java:310)
at
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
at
io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:238)
at
io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:212)
at
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
at
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
... 17 more
22/08/10 18:05:34 WARN BlockManagerDecommissioner: Stop migrating shuffle
blocks to BlockManagerId(0, 10.240.2.65, 43481, None)
```
> Handle FileNotFoundException when shuffle file deleted in decommissioner
> ------------------------------------------------------------------------
>
> Key: SPARK-40168
> URL: https://issues.apache.org/jira/browse/SPARK-40168
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.3.0
> Reporter: Zhongwei Zhu
> Priority: Major
>
> When shuffle files not found, decommissioner will handles IOException, but
> the real exception is as below:
> {code:java}
> 22/08/10 18:05:34 ERROR BlockManagerDecommissioner: Error occurred during
> migrating migrate_shuffle_1_356
> org.apache.spark.SparkException: Exception thrown in awaitResult:
> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
> at
> org.apache.spark.network.BlockTransferService.uploadBlockSync(BlockTransferService.scala:122)
> at
> org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$4(BlockManagerDecommissioner.scala:120)
> at
> org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.$anonfun$run$4$adapted(BlockManagerDecommissioner.scala:111)
> at scala.collection.immutable.List.foreach(List.scala:431)
> at
> org.apache.spark.storage.BlockManagerDecommissioner$ShuffleMigrationRunnable.run(BlockManagerDecommissioner.scala:111)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: java.io.IOException: Failed to send RPC RPC 5697756267528635203 to
> /10.240.2.65:43481: java.io.FileNotFoundException:
> /tmp/blockmgr-98a2a29a-5231-4fed-a82e-6bc0531ad407/15/shuffle_1_356_0.index
> (No such file or directory)
> at
> org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:392)
> at
> org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
> at
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
> at
> io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
> at
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
> at
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
> at
> io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
> at
> io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
> at
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
> at
> io.netty.util.internal.PromiseNotificationUtil.tryFailure(PromiseNotificationUtil.java:64)
> at
> io.netty.channel.ChannelOutboundBuffer.safeFail(ChannelOutboundBuffer.java:723)
> at
> io.netty.channel.ChannelOutboundBuffer.remove0(ChannelOutboundBuffer.java:308)
> at
> io.netty.channel.ChannelOutboundBuffer.failFlushed(ChannelOutboundBuffer.java:660)
> at
> io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:735)
> at
> io.netty.channel.AbstractChannel$AbstractUnsafe.handleWriteError(AbstractChannel.java:950)
> at
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:933)
> at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
> at
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895)
> at
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
> at
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
> at
> io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
> at
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
> at
> io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
> at
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
> at
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
> at
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
> at
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> ... 1 more
> Caused by: java.io.FileNotFoundException:
> /tmp/blockmgr-98a2a29a-5231-4fed-a82e-6bc0531ad407/15/shuffle_1_356_0.index
> (No such file or directory)
> at java.base/java.io.RandomAccessFile.open0(Native Method)
> at java.base/java.io.RandomAccessFile.open(RandomAccessFile.java:345)
> at java.base/java.io.RandomAccessFile.<init>(RandomAccessFile.java:259)
> at java.base/java.io.RandomAccessFile.<init>(RandomAccessFile.java:214)
> at io.netty.channel.DefaultFileRegion.open(DefaultFileRegion.java:88)
> at
> io.netty.channel.DefaultFileRegion.transferTo(DefaultFileRegion.java:128)
> at
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121)
> at
> org.apache.spark.network.crypto.TransportCipher$EncryptedMessage.encryptMore(TransportCipher.java:347)
> at
> org.apache.spark.network.crypto.TransportCipher$EncryptedMessage.transferTo(TransportCipher.java:310)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
> at
> io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:238)
> at
> io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:212)
> at
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
> at
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931)
> ... 17 more
> 22/08/10 18:05:34 WARN BlockManagerDecommissioner: Stop migrating shuffle
> blocks to BlockManagerId(0, 10.240.2.65, 43481, None)
> {code}
> This wrapped exception should be handled explicitly, further avoid
> unnecessary retry of this shuffle block and stop of current migration thread
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]