[ 
https://issues.apache.org/jira/browse/SPARK-45134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gaoyajun02 updated SPARK-45134:
-------------------------------
    Description: 
One possible situation that has been found is that, during the process of 
requesting mergedBlockMeta, when the channel is closed, it may trigger two 
callback callbacks and result in duplicate data for the original shuffle blocks.
 # The first time is when the channel is inactivated, the responseHandler will 
execute the callback for all outstandingRpcs.
 # The second time is when the listener corresponding to 
shuffleClient.writeAndFlush executes the callback after the channel is closed.

Some Error Logs:
{code:java}
23/09/08 09:22:21 ERROR shuffle-client-7-1 TransportResponseHandler: Still have 
1 requests outstanding when connection from host/ip:prot is closed
23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to get 
the meta of push-merged block for (3, 54) from host:port
java.io.IOException: Connection from host:port closed
        at 
org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
        at 
io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:745)
 
23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to get 
the meta of push-merged block for (3, 54) from host:port
java.io.IOException: Failed to send RPC RPC 8079698359363123411 to 
host/ip:port: java.nio.channels.ClosedChannelException
        at 
org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:433)
        at 
org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:409)
        at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
        at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
        at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
        at 
io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
        at 
io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
        at 
io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
        at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
        at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
        at 
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
        ... 18 more {code}

  was:
One possible situation that has been found is that, during the process of 
requesting mergedBlockMeta, when the channel is closed, it may trigger two 
callback callbacks and result in duplicate data for the original shuffle blocks.
 # The first time is when the channel is inactivated, the responseHandler will 
execute the callback for all outstandingRpcs.
 # The second time is when the listener corresponding to 
shuffleClient.writeAndFlush executes the callback after the channel is closed.

Some Error Logs
{code:java}
// code placeholder
23/09/08 09:22:21 ERROR shuffle-client-7-1 TransportResponseHandler: Still have 
1 requests outstanding when connection from host/ip:prot is closed
23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to get 
the meta of push-merged block for (3, 54) from host:port
java.io.IOException: Connection from host:port closed
        at 
org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
        at 
io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
        at 
org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:745)
 
23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to get 
the meta of push-merged block for (3, 54) from host:port
java.io.IOException: Failed to send RPC RPC 8079698359363123411 to 
host/ip:port: java.nio.channels.ClosedChannelException
        at 
org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:433)
        at 
org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:409)
        at 
io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
        at 
io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
        at 
io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
        at 
io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
        at 
io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
        at 
io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
        at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
        at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
        at 
io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
        at 
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.nio.channels.ClosedChannelException
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
        ... 18 more {code}


> Data duplication may occur when fallback to origin shuffle block
> ----------------------------------------------------------------
>
>                 Key: SPARK-45134
>                 URL: https://issues.apache.org/jira/browse/SPARK-45134
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 3.2.0
>            Reporter: gaoyajun02
>            Priority: Major
>
> One possible situation that has been found is that, during the process of 
> requesting mergedBlockMeta, when the channel is closed, it may trigger two 
> callback callbacks and result in duplicate data for the original shuffle 
> blocks.
>  # The first time is when the channel is inactivated, the responseHandler 
> will execute the callback for all outstandingRpcs.
>  # The second time is when the listener corresponding to 
> shuffleClient.writeAndFlush executes the callback after the channel is closed.
> Some Error Logs:
> {code:java}
> 23/09/08 09:22:21 ERROR shuffle-client-7-1 TransportResponseHandler: Still 
> have 1 requests outstanding when connection from host/ip:prot is closed
> 23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to 
> get the meta of push-merged block for (3, 54) from host:port
> java.io.IOException: Connection from host:port closed
>         at 
> org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:147)
>         at 
> org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:117)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>         at 
> io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>         at 
> io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:277)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>         at 
> io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>         at 
> org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:225)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
>         at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
>         at 
> io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
>         at 
> io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
>         at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>         at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>         at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>         at java.lang.Thread.run(Thread.java:745)
>  
> 23/09/08 09:22:21 ERROR shuffle-client-7-1 PushBasedFetchHelper: Failed to 
> get the meta of push-merged block for (3, 54) from host:port
> java.io.IOException: Failed to send RPC RPC 8079698359363123411 to 
> host/ip:port: java.nio.channels.ClosedChannelException
>         at 
> org.apache.spark.network.client.TransportClient$RpcChannelListener.handleFailure(TransportClient.java:433)
>         at 
> org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:409)
>         at 
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
>         at 
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:551)
>         at 
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
>         at 
> io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
>         at 
> io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:608)
>         at 
> io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
>         at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:993)
>         at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>         at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
>         at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:767)
>         at 
> io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1071)
>         at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:497)
>         at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>         at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>         at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.nio.channels.ClosedChannelException
>         at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>         ... 18 more {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to