Mayank Jain created SPARK-15661:
-----------------------------------
Summary: mapWithState eating heap memory
Key: SPARK-15661
URL: https://issues.apache.org/jira/browse/SPARK-15661
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 1.6.1
Environment: OS : CentOS Linux release 7.2.1511 (Core)
Spark 1.6.1
Kafka 0.9.0.1
zookeeper 3.4.8
Java 1.8.0_65
Reporter: Mayank Jain
Priority: Critical
I am using spark streaming, reading data from kafka (direct stream) at a batch
interval of 30 secs. Input is coming at a rate of 2,00,000/minute (input
contains timestamp).
As per my use case i need to perform per minute aggregation (my groupby key
fields includes "start_time" in minutes derived from input timestamp). Since i
need to perform cumulative operation i am maintaining a state of aggregated
data per micro-batch (at 30 sec interval).
For the purpose i am using mapWithState. I am aggregating my data then perform
mapWithState, if state exists then another round of aggregation is done
(cumulative) and update the state else if state doesn't exists then it simply
update it to the state (appending it to state). And timing out state after 3
minutes.
Whats happening after smooth run of my spark job for few hours it start showing
following error:
16/05/30 07:05:58 ERROR ShuffleBlockFetcherIterator: Failed to get block(s)
from worker3:53492
java.io.IOException: Failed to connect to worker3
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.ConnectException: Connection refused: worker3
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
at
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:289)
at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
... 1 more
On further analysis i found that one of my worker node was out of memory. After
getting the dump of my heap of the worker i found that mapWithState is eating
up the heap. It was recursively creating state inside another inside
another..(that's why my garbage collector was not freeing up memory on state
timeout).
I have captured my heap as : http://prntscr.com/bah8i7
Please suggest me best possible way to deal with this, i want my state get
cleared after every 3 minutes from mapWithState free my memory.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]