Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage
Sorry I realized that I left a bit of the last email. This is the only BLOCKED thread in the dump. Refence handler is blocked most likely due to the GC running at the moment of the dump. "Reference Handler" daemon prio=10 tid=2 BLOCKED at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157) On Fri, Feb 5, 2016 at 10:44 AM, Udo Fhollwrote: > It does not look like. Here is the output of "grep -A2 -i waiting > spark_tdump.log" > > "RMI TCP Connection(idle)" daemon prio=5 tid=156 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "task-result-getter-1" daemon prio=5 tid=101 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "BLOCK_MANAGER cleanup timer" daemon prio=5 tid=46 WAITING > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > -- > "context-cleaner-periodic-gc" daemon prio=5 tid=69 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "qtp512934838-58" daemon prio=5 tid=58 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "dispatcher-event-loop-3" daemon prio=5 tid=22 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "RMI TCP Connection(idle)" daemon prio=5 tid=150 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "submit-job-thread-pool-0" daemon prio=5 tid=83 WAITING > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > -- > "cw-metrics-publisher" daemon prio=5 tid=90 TIMED_WAITING > at java.lang.Object.wait(Native Method) > at > com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.runOnce(CWPublisherRunnable.java:136) > -- > "qtp512934838-57" daemon prio=5 tid=57 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-19" daemon > prio=5 tid=193 WAITING > at sun.misc.Unsafe.park(Native Method) > at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) > -- > "dispatcher-event-loop-2" daemon prio=5 tid=21 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "qtp512934838-56" daemon prio=5 tid=56 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "BROADCAST_VARS cleanup timer" daemon prio=5 tid=47 WAITING > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:502) > -- > "pool-1-thread-1" prio=5 tid=16 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "dispatcher-event-loop-0" daemon prio=5 tid=19 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "RecurringTimer - Kinesis Checkpointer - Worker > localhost:7b412e3a-f7c8-466d-90f1-deaad8656884" daemon prio=5 tid=89 > TIMED_WAITING > at java.lang.Thread.sleep(Native Method) > at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63) > -- > "qtp512934838-55" daemon prio=5 tid=55 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "Executor task launch worker-0" daemon prio=5 tid=84 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "qtp512934838-54" daemon prio=5 tid=54 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > -- > "pool-28-thread-1" prio=5 tid=92 WAITING > at sun.misc.Unsafe.park(Native Method) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > -- > "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-18" daemon > prio=5 tid=185 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at > scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135) > -- > "Spark Context Cleaner" daemon prio=5 tid=68 TIMED_WAITING > at java.lang.Object.wait(Native Method) > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) > -- > "qtp512934838-53" daemon prio=5 tid=53 TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > at
Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage
It does not look like. Here is the output of "grep -A2 -i waiting spark_tdump.log" "RMI TCP Connection(idle)" daemon prio=5 tid=156 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "task-result-getter-1" daemon prio=5 tid=101 WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) -- "BLOCK_MANAGER cleanup timer" daemon prio=5 tid=46 WAITING at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) -- "context-cleaner-periodic-gc" daemon prio=5 tid=69 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "qtp512934838-58" daemon prio=5 tid=58 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "dispatcher-event-loop-3" daemon prio=5 tid=22 WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) -- "RMI TCP Connection(idle)" daemon prio=5 tid=150 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "submit-job-thread-pool-0" daemon prio=5 tid=83 WAITING at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) -- "cw-metrics-publisher" daemon prio=5 tid=90 TIMED_WAITING at java.lang.Object.wait(Native Method) at com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.runOnce(CWPublisherRunnable.java:136) -- "qtp512934838-57" daemon prio=5 tid=57 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-19" daemon prio=5 tid=193 WAITING at sun.misc.Unsafe.park(Native Method) at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075) -- "dispatcher-event-loop-2" daemon prio=5 tid=21 WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) -- "qtp512934838-56" daemon prio=5 tid=56 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "BROADCAST_VARS cleanup timer" daemon prio=5 tid=47 WAITING at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) -- "pool-1-thread-1" prio=5 tid=16 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "dispatcher-event-loop-0" daemon prio=5 tid=19 WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) -- "RecurringTimer - Kinesis Checkpointer - Worker localhost:7b412e3a-f7c8-466d-90f1-deaad8656884" daemon prio=5 tid=89 TIMED_WAITING at java.lang.Thread.sleep(Native Method) at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63) -- "qtp512934838-55" daemon prio=5 tid=55 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "Executor task launch worker-0" daemon prio=5 tid=84 WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) -- "qtp512934838-54" daemon prio=5 tid=54 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "pool-28-thread-1" prio=5 tid=92 WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) -- "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-18" daemon prio=5 tid=185 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135) -- "Spark Context Cleaner" daemon prio=5 tid=68 TIMED_WAITING at java.lang.Object.wait(Native Method) at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143) -- "qtp512934838-53" daemon prio=5 tid=53 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "SparkListenerBus" daemon prio=5 tid=18 WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) -- "block-manager-slave-async-thread-pool-6" daemon prio=5 tid=179 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "RMI Scheduler(0)" daemon prio=5 tid=151 TIMED_WAITING at sun.misc.Unsafe.park(Native Method) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) -- "Executor task launch worker-1" daemon prio=5 tid=99 WAITING at java.lang.Object.wait(Native Method)
Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage
Thank you for your response Unfortunately I cannot share a thread dump. What are you looking for exactly? Here is the list of the 50 biggest objects (retained size order, descendent): java.util.concurrent.ArrayBlockingQueue# java.lang.Object[]# org.apache.spark.streaming.receiver.BlockGenerator$Block# scala.collection.mutable.ArrayBuffer# java.lang.Object[]# org.apache.spark.streaming.receiver.BlockGenerator# scala.collection.mutable.ArrayBuffer# java.lang.Object[]# scala.concurrent.forkjoin.ForkJoinPool# scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]# org.apache.spark.streaming.receiver.BlockGenerator$Block# scala.collection.mutable.ArrayBuffer# java.lang.Object[]# org.apache.spark.streaming.receiver.BlockGenerator$Block# scala.collection.mutable.ArrayBuffer# java.lang.Object[]# org.apache.spark.storage.MemoryStore# java.util.LinkedHashMap# scala.concurrent.forkjoin.ForkJoinPool$WorkQueue# scala.concurrent.forkjoin.ForkJoinTask[]# scala.concurrent.forkjoin.ForkJoinPool$WorkQueue# scala.concurrent.forkjoin.ForkJoinTask[]# scala.concurrent.forkjoin.ForkJoinPool$WorkQueue# scala.concurrent.forkjoin.ForkJoinTask[]# org.apache.spark.streaming.receiver.BlockGenerator$Block# scala.collection.mutable.ArrayBuffer# java.lang.Object[]# org.apache.spark.streaming.receiver.BlockGenerator$Block# scala.collection.mutable.ArrayBuffer# java.lang.Object[]# org.apache.spark.streaming.receiver.BlockGenerator$Block# scala.collection.mutable.ArrayBuffer# java.lang.Object[]# org.apache.spark.streaming.receiver.BlockGenerator$Block# scala.collection.mutable.ArrayBuffer# java.lang.Object[]# org.apache.spark.streaming.receiver.BlockGenerator$Block# scala.collection.mutable.ArrayBuffer# java.lang.Object[]# org.apache.spark.streaming.receiver.BlockGenerator$Block# scala.collection.mutable.ArrayBuffer# java.lang.Object[]# org.apache.spark.streaming.receiver.BlockGenerator$Block# scala.collection.mutable.ArrayBuffer# java.lang.Object[]# scala.collection.Iterator$$anon$ org.apache.spark.InterruptibleIterator# scala.collection.IndexedSeqLike$Elements# scala.collection.mutable.ArrayOps$ofRef# java.lang.Object[]# On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhuwrote: > Hey Udo, > > mapWithState usually uses much more memory than updateStateByKey since it > caches the states in memory. > > However, from your description, looks BlockGenerator cannot push data into > BlockManager, there may be something wrong in BlockGenerator. Could you > share the top 50 objects in the heap dump and the thread dump? > > > On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl wrote: > >> Hi all, >> >> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I >> see a huge increase of memory. Most of it is a massive "BlockGenerator" >> (which points to a massive "ArrayBlockingQueue" that in turns point to a >> huge "Object[]"). >> >> I'm pretty sure it has to do with my code, but I barely changed anything >> in the code. Just adapted the function. >> >> Did anyone run into this? >> >> Best regards, >> Udo. >> > >
Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage
I guess it may be some dead-lock in BlockGenerator. Could you check it by yourself? On Thu, Feb 4, 2016 at 4:14 PM, Udo Fhollwrote: > Thank you for your response > > Unfortunately I cannot share a thread dump. What are you looking for > exactly? > > Here is the list of the 50 biggest objects (retained size order, > descendent): > > java.util.concurrent.ArrayBlockingQueue# > java.lang.Object[]# > org.apache.spark.streaming.receiver.BlockGenerator$Block# > scala.collection.mutable.ArrayBuffer# > java.lang.Object[]# > org.apache.spark.streaming.receiver.BlockGenerator# > scala.collection.mutable.ArrayBuffer# > java.lang.Object[]# > scala.concurrent.forkjoin.ForkJoinPool# > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]# > org.apache.spark.streaming.receiver.BlockGenerator$Block# > scala.collection.mutable.ArrayBuffer# > java.lang.Object[]# > org.apache.spark.streaming.receiver.BlockGenerator$Block# > scala.collection.mutable.ArrayBuffer# > java.lang.Object[]# > org.apache.spark.storage.MemoryStore# > java.util.LinkedHashMap# > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue# > scala.concurrent.forkjoin.ForkJoinTask[]# > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue# > scala.concurrent.forkjoin.ForkJoinTask[]# > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue# > scala.concurrent.forkjoin.ForkJoinTask[]# > org.apache.spark.streaming.receiver.BlockGenerator$Block# > scala.collection.mutable.ArrayBuffer# > java.lang.Object[]# > org.apache.spark.streaming.receiver.BlockGenerator$Block# > scala.collection.mutable.ArrayBuffer# > java.lang.Object[]# > org.apache.spark.streaming.receiver.BlockGenerator$Block# > scala.collection.mutable.ArrayBuffer# > java.lang.Object[]# > org.apache.spark.streaming.receiver.BlockGenerator$Block# > scala.collection.mutable.ArrayBuffer# > java.lang.Object[]# > org.apache.spark.streaming.receiver.BlockGenerator$Block# > scala.collection.mutable.ArrayBuffer# > java.lang.Object[]# > org.apache.spark.streaming.receiver.BlockGenerator$Block# > scala.collection.mutable.ArrayBuffer# > java.lang.Object[]# > org.apache.spark.streaming.receiver.BlockGenerator$Block# > scala.collection.mutable.ArrayBuffer# > java.lang.Object[]# > scala.collection.Iterator$$anon$ > org.apache.spark.InterruptibleIterator# > scala.collection.IndexedSeqLike$Elements# > scala.collection.mutable.ArrayOps$ofRef# > java.lang.Object[]# > > > > > On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu < > shixi...@databricks.com> wrote: > >> Hey Udo, >> >> mapWithState usually uses much more memory than updateStateByKey since it >> caches the states in memory. >> >> However, from your description, looks BlockGenerator cannot push data >> into BlockManager, there may be something wrong in BlockGenerator. Could >> you share the top 50 objects in the heap dump and the thread dump? >> >> >> On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl wrote: >> >>> Hi all, >>> >>> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I >>> see a huge increase of memory. Most of it is a massive "BlockGenerator" >>> (which points to a massive "ArrayBlockingQueue" that in turns point to a >>> huge "Object[]"). >>> >>> I'm pretty sure it has to do with my code, but I barely changed anything >>> in the code. Just adapted the function. >>> >>> Did anyone run into this? >>> >>> Best regards, >>> Udo. >>> >> >> >
Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage
Hey Udo, mapWithState usually uses much more memory than updateStateByKey since it caches the states in memory. However, from your description, looks BlockGenerator cannot push data into BlockManager, there may be something wrong in BlockGenerator. Could you share the top 50 objects in the heap dump and the thread dump? On Wed, Feb 3, 2016 at 9:52 AM, Udo Fhollwrote: > Hi all, > > I recently migrated from 'updateStateByKey' to 'mapWithState' and now I > see a huge increase of memory. Most of it is a massive "BlockGenerator" > (which points to a massive "ArrayBlockingQueue" that in turns point to a > huge "Object[]"). > > I'm pretty sure it has to do with my code, but I barely changed anything > in the code. Just adapted the function. > > Did anyone run into this? > > Best regards, > Udo. >
Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage
Hi all, I recently migrated from 'updateStateByKey' to 'mapWithState' and now I see a huge increase of memory. Most of it is a massive "BlockGenerator" (which points to a massive "ArrayBlockingQueue" that in turns point to a huge "Object[]"). I'm pretty sure it has to do with my code, but I barely changed anything in the code. Just adapted the function. Did anyone run into this? Best regards, Udo.