[ 
https://issues.apache.org/jira/browse/SPARK-15661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-15661.
-------------------------------
    Resolution: Not A Problem

As described, this is not (necessarily) a problem. Your state may be referring 
to your previous state; depends on how you wrote your app. This is a question 
at this stage and should go to user@

> mapWithState eating heap memory
> -------------------------------
>
>                 Key: SPARK-15661
>                 URL: https://issues.apache.org/jira/browse/SPARK-15661
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.1
>         Environment: OS : CentOS Linux release 7.2.1511 (Core) 
> Spark 1.6.1 
> Kafka 0.9.0.1 
> zookeeper 3.4.8 
> Java 1.8.0_65
>            Reporter: Mayank Jain
>            Priority: Critical
>
> I am using spark streaming, reading data from kafka (direct stream) at a 
> batch interval of 30 secs. Input is coming at a rate of 2,00,000/minute 
> (input contains timestamp).
> As per my use case i need to perform per minute aggregation (my groupby key 
> fields includes "start_time" in minutes derived from input timestamp). Since 
> i need to perform cumulative operation i am maintaining a state of aggregated 
> data per micro-batch (at 30 sec interval).
> For the purpose i am using mapWithState. I am aggregating my data then 
> perform mapWithState, if state exists then another round of aggregation is 
> done (cumulative) and update the state else if state doesn't exists then it 
> simply update it to the state (appending it to state). And timing out state 
> after 3 minutes.
> Whats happening after smooth run of my spark job for few hours it start 
> showing following error: 
> 16/05/30 07:05:58 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) 
> from worker3:53492
> java.io.IOException: Failed to connect to worker3
>       at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
>       at 
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
>       at 
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>       at 
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.ConnectException: Connection refused: worker3
>       at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>       at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>       at 
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>       at 
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>       ... 1 more
> On further analysis i found that one of my worker node was out of memory. 
> After getting the dump of my heap of the worker i found that mapWithState is 
> eating up the heap. It was recursively creating state inside another inside 
> another..(that's why my garbage collector was not freeing up memory on state 
> timeout).   
> I have captured my heap as : http://prntscr.com/bah8i7
> Please suggest me best possible way to deal with this, i want my state get 
> cleared after every 3 minutes from mapWithState free my memory.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to