Mayank Jain created SPARK-15661:
-----------------------------------

             Summary: mapWithState eating heap memory
                 Key: SPARK-15661
                 URL: https://issues.apache.org/jira/browse/SPARK-15661
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 1.6.1
         Environment: OS : CentOS Linux release 7.2.1511 (Core) 
Spark 1.6.1 
Kafka 0.9.0.1 
zookeeper 3.4.8 
Java 1.8.0_65
            Reporter: Mayank Jain
            Priority: Critical


I am using spark streaming, reading data from kafka (direct stream) at a batch 
interval of 30 secs. Input is coming at a rate of 2,00,000/minute (input 
contains timestamp).
As per my use case i need to perform per minute aggregation (my groupby key 
fields includes "start_time" in minutes derived from input timestamp). Since i 
need to perform cumulative operation i am maintaining a state of aggregated 
data per micro-batch (at 30 sec interval).
For the purpose i am using mapWithState. I am aggregating my data then perform 
mapWithState, if state exists then another round of aggregation is done 
(cumulative) and update the state else if state doesn't exists then it simply 
update it to the state (appending it to state). And timing out state after 3 
minutes.

Whats happening after smooth run of my spark job for few hours it start showing 
following error: 
16/05/30 07:05:58 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) 
from worker3:53492
java.io.IOException: Failed to connect to worker3
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
        at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: worker3
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
        at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
        at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        ... 1 more

On further analysis i found that one of my worker node was out of memory. After 
getting the dump of my heap of the worker i found that mapWithState is eating 
up the heap. It was recursively creating state inside another inside 
another..(that's why my garbage collector was not freeing up memory on state 
timeout).   
I have captured my heap as : http://prntscr.com/bah8i7

Please suggest me best possible way to deal with this, i want my state get 
cleared after every 3 minutes from mapWithState free my memory.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to