Duarte Ferreira created SPARK-32893:
---------------------------------------

             Summary: Structured Streaming and Dynamic Allocation on 
StandaloneCluster
                 Key: SPARK-32893
                 URL: https://issues.apache.org/jira/browse/SPARK-32893
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.0.1
            Reporter: Duarte Ferreira


We are currently using Spark 3.0.1 Standalone cluster to run our Structured 
streaming applications.

We set the following configurations when running the application in cluster 
mode:
 * spark.dynamicAllocation.enabled = true
 * spark.shuffle.service.enabled = true
 * spark.cores.max =5
 * spark.executor.memory = 1G
 * spark.executor.cores = 1

We also have the configurations set to enable spark.shuffle.service.enabled on 
each worker and have a cluster composed of 1 master and 2 slaves.

The application reads data from a kafka Topic (readTopic) using [This 
documentation, 
|https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html]applies
 some transformations on the DataSet using spark SQL and writes data to another 
Kafka Topic (writeTopic).

When we start the application it behaves correctly, it starts with 0 executors 
and. as we start feeding data to the readTopic, it starts increasing the number 
of executors until it reaches the 5 executors limit and all messages are 
transformed and written to the writeTopic in Kafka.

If we stop feeding messages to the readTopic the application will work as 
expected and starts killing executors that are not needed anymore until we stop 
sending data completely and it reach 0 executors running.

If we start sending data again right away, it behaves just as expected it 
starts increasing the numbers of executors again. But if we leave the 
application in idle at 0 executors for around 10 minutes we start getting 
errors like this:
{noformat}
*no*
20/09/15 10:41:22 ERROR TransportClient: Failed to send RPC RPC 
7570256331800450365 to sparkmaster/10.0.12.231:7077: 
java.nio.channels.ClosedChannelException
java.nio.channels.ClosedChannelException
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764)
        at 
io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104)
        at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
        at sun.nio.ch.IOUtil.write(IOUtil.java:65)
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
        at 
org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:148)
        at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
        at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:362)
        at 
io.netty.channel.nio.AbstractNioByteChannel.doWriteInternal(AbstractNioByteChannel.java:235)
        at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite0(AbstractNioByteChannel.java:209)
        at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:400)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:930)
        at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
        at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:897)
        at 
io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
        at 
io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
        at 
io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
        ... 8 more
20/09/15 10:41:22 WARN ExecutorAllocationManager: Unable to reach the cluster 
manager to request 1 total executors!

{noformat}
If we restart the master node, everything works again, if we restart the spark 
app, everything starts working again.

All nodes can ping the master node and they can start other applications or 
kill the ones running, there seams to be a problem only when increasing the 
executors.

Is this a Bug or are we missing some configuration/timeout?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to