[ 
https://issues.apache.org/jira/browse/SPARK-32893?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703177#comment-17703177
 ] 

Ranga Reddy commented on SPARK-32893:
-------------------------------------

I can see similar behaviour Spark Structured Streaming with Yarn.
{code:java}
2023-03-14 18:17:29 ERROR TransportClient:337 - Failed to send RPC RPC 
7955407071046657873 to /127.0.0.1:50040: 
java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
        at 
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748) {code}

> Structured Streaming and Dynamic Allocation on StandaloneCluster
> ----------------------------------------------------------------
>
>                 Key: SPARK-32893
>                 URL: https://issues.apache.org/jira/browse/SPARK-32893
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.0.1
>            Reporter: Duarte Ferreira
>            Priority: Major
>
> We are currently using Spark 3.0.1 Standalone cluster to run our Structured 
> streaming applications.
> We set the following configurations when running the application in cluster 
> mode:
>  * spark.dynamicAllocation.enabled = true
>  * spark.shuffle.service.enabled = true
>  * spark.cores.max =5
>  * spark.executor.memory = 1G
>  * spark.executor.cores = 1
> We also have the configurations set to enable spark.shuffle.service.enabled 
> on each worker and have a cluster composed of 1 master and 2 slaves.
> The application reads data from a kafka Topic (readTopic) using [This 
> documentation, 
> |https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]applies
>  some transformations on the DataSet using spark SQL and writes data to 
> another Kafka Topic (writeTopic).
> When we start the application it behaves correctly, it starts with 0 
> executors and. as we start feeding data to the readTopic, it starts 
> increasing the number of executors until it reaches the 5 executors limit and 
> all messages are transformed and written to the writeTopic in Kafka.
> If we stop feeding messages to the readTopic the application will work as 
> expected and starts killing executors that are not needed anymore until we 
> stop sending data completely and it reach 0 executors running.
> If we start sending data again right away, it behaves just as expected it 
> starts increasing the numbers of executors again. But if we leave the 
> application in idle at 0 executors for around 10 minutes we start getting 
> errors like this:
> {noformat}
> *no*
> 20/09/15 10:41:22 ERROR TransportClient: Failed to send RPC RPC 
> 7570256331800450365 to sparkmaster/10.0.12.231:7077: 
> java.nio.channels.ClosedChannelException
> java.nio.channels.ClosedChannelException
>       at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
>       at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
>       at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
>       at 
> io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
>       at 
> io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>       at 
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>       at 
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Connection reset by peer
>       at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>       at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>       at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>       at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>       at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
>       at 
> org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148)
>       at 
> org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
>       at 
> io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
>       at 
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
>       at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
>       at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
>       at 
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
>       at 
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
>       at 
> io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
>       ... 8 more
> 20/09/15 10:41:22 WARN ExecutorAllocationManager: Unable to reach the cluster 
> manager to request 1 total executors!
> {noformat}
> If we restart the master node, everything works again, if we restart the 
> spark app, everything starts working again.
> All nodes can ping the master node and they can start other applications or 
> kill the ones running, there seams to be a problem only when increasing the 
> executors.
> Is this a Bug or are we missing some configuration/timeout?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to