Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-06 Thread Udo Fholl
Sorry I realized that I left a bit of the last email.

This is the only BLOCKED thread in the dump. Refence handler is blocked
most likely due to the GC running at the moment of the dump.

"Reference Handler" daemon prio=10 tid=2 BLOCKED
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
  at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)


On Fri, Feb 5, 2016 at 10:44 AM, Udo Fholl  wrote:

> It does not look like. Here is the output of "grep -A2 -i waiting
> spark_tdump.log"
>
> "RMI TCP Connection(idle)" daemon prio=5 tid=156 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "task-result-getter-1" daemon prio=5 tid=101 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "BLOCK_MANAGER cleanup timer" daemon prio=5 tid=46 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "context-cleaner-periodic-gc" daemon prio=5 tid=69 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "qtp512934838-58" daemon prio=5 tid=58 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "dispatcher-event-loop-3" daemon prio=5 tid=22 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "RMI TCP Connection(idle)" daemon prio=5 tid=150 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "submit-job-thread-pool-0" daemon prio=5 tid=83 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "cw-metrics-publisher" daemon prio=5 tid=90 TIMED_WAITING
>   at java.lang.Object.wait(Native Method)
>   at
> com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.runOnce(CWPublisherRunnable.java:136)
> --
> "qtp512934838-57" daemon prio=5 tid=57 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-19" daemon
> prio=5 tid=193 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
> --
> "dispatcher-event-loop-2" daemon prio=5 tid=21 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "qtp512934838-56" daemon prio=5 tid=56 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "BROADCAST_VARS cleanup timer" daemon prio=5 tid=47 WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:502)
> --
> "pool-1-thread-1" prio=5 tid=16 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "dispatcher-event-loop-0" daemon prio=5 tid=19 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "RecurringTimer - Kinesis Checkpointer - Worker
> localhost:7b412e3a-f7c8-466d-90f1-deaad8656884" daemon prio=5 tid=89
> TIMED_WAITING
>   at java.lang.Thread.sleep(Native Method)
>   at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
> --
> "qtp512934838-55" daemon prio=5 tid=55 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "Executor task launch worker-0" daemon prio=5 tid=84 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "qtp512934838-54" daemon prio=5 tid=54 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> --
> "pool-28-thread-1" prio=5 tid=92 WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> --
> "sparkDriverActorSystem-akka.remote.default-remote-dispatcher-18" daemon
> prio=5 tid=185 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at
> scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
> --
> "Spark Context Cleaner" daemon prio=5 tid=68 TIMED_WAITING
>   at java.lang.Object.wait(Native Method)
>   at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> --
> "qtp512934838-53" daemon prio=5 tid=53 TIMED_WAITING
>   at sun.misc.Unsafe.park(Native Method)
>   at 

Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-05 Thread Udo Fholl
It does not look like. Here is the output of "grep -A2 -i waiting
spark_tdump.log"

"RMI TCP Connection(idle)" daemon prio=5 tid=156 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"task-result-getter-1" daemon prio=5 tid=101 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"BLOCK_MANAGER cleanup timer" daemon prio=5 tid=46 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"context-cleaner-periodic-gc" daemon prio=5 tid=69 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"qtp512934838-58" daemon prio=5 tid=58 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"dispatcher-event-loop-3" daemon prio=5 tid=22 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"RMI TCP Connection(idle)" daemon prio=5 tid=150 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"submit-job-thread-pool-0" daemon prio=5 tid=83 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"cw-metrics-publisher" daemon prio=5 tid=90 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at
com.amazonaws.services.kinesis.metrics.impl.CWPublisherRunnable.runOnce(CWPublisherRunnable.java:136)
--
"qtp512934838-57" daemon prio=5 tid=57 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"sparkDriverActorSystem-akka.remote.default-remote-dispatcher-19" daemon
prio=5 tid=193 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
--
"dispatcher-event-loop-2" daemon prio=5 tid=21 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"qtp512934838-56" daemon prio=5 tid=56 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"BROADCAST_VARS cleanup timer" daemon prio=5 tid=47 WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.Object.wait(Object.java:502)
--
"pool-1-thread-1" prio=5 tid=16 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"dispatcher-event-loop-0" daemon prio=5 tid=19 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"RecurringTimer - Kinesis Checkpointer - Worker
localhost:7b412e3a-f7c8-466d-90f1-deaad8656884" daemon prio=5 tid=89
TIMED_WAITING
  at java.lang.Thread.sleep(Native Method)
  at org.apache.spark.util.SystemClock.waitTillTime(Clock.scala:63)
--
"qtp512934838-55" daemon prio=5 tid=55 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"Executor task launch worker-0" daemon prio=5 tid=84 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"qtp512934838-54" daemon prio=5 tid=54 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"pool-28-thread-1" prio=5 tid=92 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"sparkDriverActorSystem-akka.remote.default-remote-dispatcher-18" daemon
prio=5 tid=185 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at
scala.concurrent.forkjoin.ForkJoinPool.idleAwaitWork(ForkJoinPool.java:2135)
--
"Spark Context Cleaner" daemon prio=5 tid=68 TIMED_WAITING
  at java.lang.Object.wait(Native Method)
  at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
--
"qtp512934838-53" daemon prio=5 tid=53 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"SparkListenerBus" daemon prio=5 tid=18 WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
--
"block-manager-slave-async-thread-pool-6" daemon prio=5 tid=179
TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"RMI Scheduler(0)" daemon prio=5 tid=151 TIMED_WAITING
  at sun.misc.Unsafe.park(Native Method)
  at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
--
"Executor task launch worker-1" daemon prio=5 tid=99 WAITING
  at java.lang.Object.wait(Native Method)
  

Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-04 Thread Udo Fholl
Thank you for your response

Unfortunately I cannot share  a thread dump. What are you looking for
exactly?

Here is the list of the 50 biggest objects (retained size order,
descendent):

java.util.concurrent.ArrayBlockingQueue#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
scala.concurrent.forkjoin.ForkJoinPool#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.storage.MemoryStore#
java.util.LinkedHashMap#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
scala.concurrent.forkjoin.ForkJoinTask[]#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
scala.concurrent.forkjoin.ForkJoinTask[]#
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
scala.concurrent.forkjoin.ForkJoinTask[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
org.apache.spark.streaming.receiver.BlockGenerator$Block#
scala.collection.mutable.ArrayBuffer#
java.lang.Object[]#
scala.collection.Iterator$$anon$
org.apache.spark.InterruptibleIterator#
scala.collection.IndexedSeqLike$Elements#
scala.collection.mutable.ArrayOps$ofRef#
java.lang.Object[]#



On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu 
wrote:

> Hey Udo,
>
> mapWithState usually uses much more memory than updateStateByKey since it
> caches the states in memory.
>
> However, from your description, looks BlockGenerator cannot push data into
> BlockManager, there may be something wrong in BlockGenerator. Could you
> share the top 50 objects in the heap dump and the thread dump?
>
>
> On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl  wrote:
>
>> Hi all,
>>
>> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
>> see a huge increase of memory. Most of it is a massive "BlockGenerator"
>> (which points to a massive "ArrayBlockingQueue" that in turns point to a
>> huge "Object[]").
>>
>> I'm pretty sure it has to do with my code, but I barely changed anything
>> in the code. Just adapted the function.
>>
>> Did anyone run into this?
>>
>> Best regards,
>> Udo.
>>
>
>


Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-04 Thread Shixiong(Ryan) Zhu
I guess it may be some dead-lock in BlockGenerator. Could you check it by
yourself?

On Thu, Feb 4, 2016 at 4:14 PM, Udo Fholl  wrote:

> Thank you for your response
>
> Unfortunately I cannot share  a thread dump. What are you looking for
> exactly?
>
> Here is the list of the 50 biggest objects (retained size order,
> descendent):
>
> java.util.concurrent.ArrayBlockingQueue#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> scala.concurrent.forkjoin.ForkJoinPool#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.storage.MemoryStore#
> java.util.LinkedHashMap#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue#
> scala.concurrent.forkjoin.ForkJoinTask[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> org.apache.spark.streaming.receiver.BlockGenerator$Block#
> scala.collection.mutable.ArrayBuffer#
> java.lang.Object[]#
> scala.collection.Iterator$$anon$
> org.apache.spark.InterruptibleIterator#
> scala.collection.IndexedSeqLike$Elements#
> scala.collection.mutable.ArrayOps$ofRef#
> java.lang.Object[]#
>
>
>
>
> On Thu, Feb 4, 2016 at 7:14 PM, Shixiong(Ryan) Zhu <
> shixi...@databricks.com> wrote:
>
>> Hey Udo,
>>
>> mapWithState usually uses much more memory than updateStateByKey since it
>> caches the states in memory.
>>
>> However, from your description, looks BlockGenerator cannot push data
>> into BlockManager, there may be something wrong in BlockGenerator. Could
>> you share the top 50 objects in the heap dump and the thread dump?
>>
>>
>> On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl  wrote:
>>
>>> Hi all,
>>>
>>> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
>>> see a huge increase of memory. Most of it is a massive "BlockGenerator"
>>> (which points to a massive "ArrayBlockingQueue" that in turns point to a
>>> huge "Object[]").
>>>
>>> I'm pretty sure it has to do with my code, but I barely changed anything
>>> in the code. Just adapted the function.
>>>
>>> Did anyone run into this?
>>>
>>> Best regards,
>>> Udo.
>>>
>>
>>
>


Re: Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-04 Thread Shixiong(Ryan) Zhu
Hey Udo,

mapWithState usually uses much more memory than updateStateByKey since it
caches the states in memory.

However, from your description, looks BlockGenerator cannot push data into
BlockManager, there may be something wrong in BlockGenerator. Could you
share the top 50 objects in the heap dump and the thread dump?


On Wed, Feb 3, 2016 at 9:52 AM, Udo Fholl  wrote:

> Hi all,
>
> I recently migrated from 'updateStateByKey' to 'mapWithState' and now I
> see a huge increase of memory. Most of it is a massive "BlockGenerator"
> (which points to a massive "ArrayBlockingQueue" that in turns point to a
> huge "Object[]").
>
> I'm pretty sure it has to do with my code, but I barely changed anything
> in the code. Just adapted the function.
>
> Did anyone run into this?
>
> Best regards,
> Udo.
>


Spark Streaming - 1.6.0: mapWithState Kinesis huge memory usage

2016-02-03 Thread Udo Fholl
Hi all,

I recently migrated from 'updateStateByKey' to 'mapWithState' and now I see
a huge increase of memory. Most of it is a massive "BlockGenerator" (which
points to a massive "ArrayBlockingQueue" that in turns point to a huge
"Object[]").

I'm pretty sure it has to do with my code, but I barely changed anything in
the code. Just adapted the function.

Did anyone run into this?

Best regards,
Udo.