I've done several thread dumps and the two dispatcher pools dealing with
Cassandra have all their threads WAITING or TIMED_WAIT and based on their
call stacks it appears they aren't doing any work like waiting for
Cassandra to return unless I'm reading them wrong.
"manhattan-cassandra-plugin-default-dispatcher-42359" #47941 prio=5
os_prio=0 tid=0x00007f33e805c800 nid=0x3d35 waiting on condition
[0x00007f336eec6000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000005c19a8e28> (a
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
at scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
"manhattan-cassandra-plugin-blocking-dispatcher-42441" #48090 prio=5
os_prio=0 tid=0x00007f33f016b000 nid=0x3e2e waiting on condition
[0x00007f33805f5000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000006c37a33f0> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
When I examine the memory dump and diver into EventSourced.scala I see that
the journalBatch queue is full and the writeInProgress variable is set to
TRUE so I know the Actor thinks a write is in progress. Correct me if I'm
wrong but shouldn't a Cassandra write that takes more than 30 seconds set
off the CircuitBreaker pattern in writeMessages inside the
AsyncWriteJournal based on this code?
try breaker.withCircuitBreaker(asyncWriteMessages(prep))
catch { case NonFatal(e) ⇒ Future.failed(e) }
It seems that would be one of the logical protections to put in place for a
fail fast approach to interacting with an external repository that doesn't
return.
Again thanks for any help, I'm currently in the process of building a local
version of Akka that I can instrument to try tracking this issue futher.
-Richard
On Tuesday, January 17, 2017 at 3:06:34 AM UTC-8, Patrik Nordwall wrote:
>
> I think the bottleneck is the serialization or the writes to Cassandra,
> both happening in the Cassandra journal (akka-persistence-cassandra). Are
> you using the latest version of akka-persistence-cassandra? Such things
> should run on cassandra-plugin-default-dispatcher
> or cassandra-plugin-blocking-dispatcher
>
> The other queues are probably just symptoms of that.
>
> On Mon, Jan 16, 2017 at 9:24 AM, Richard Ney <[email protected]
> <javascript:>> wrote:
>
>> I should clarify my post. My messages aren't backing up in those three
>> queues at the same time, the problem has moved each time I've changed the
>> actor's queuing behavior. The first time I examined a heap dump all the
>> objects were in the mailbox of the actor. So I changed the mailbox to use a
>> bounded deque. The issue occurred again with the memory increasing so the
>> next dump showed all the messages queuing up in the internalStash. So I
>> changed the actor again to use 'persistAsync' instead of 'persist' and this
>> time the dump shows all the messages queuing in the journalBatch.
>>
>> So maybe my question should be: What is the message flow from the
>> journalBatch into the Cassandra message table? Is the same dispatcher
>> thread that processes messages used to move the messages to the Cassandra
>> plug-in so if the 'persistAsync' or receiveCommand handler takes too much
>> time processing the message we end up backing up messages in queue?
>>
>> As my other message said, just trying to figure out where to look next in
>> the chain since this only happens under load.
>>
>> Regards,
>>
>> Richard
>>
>>
>> On Sunday, January 15, 2017 at 6:56:42 PM UTC-8, Richard Ney wrote:
>>>
>>> Added to this based on my latest test runs. At this time I've based on
>>> code changes witht he most recent being the change from 'persist' to
>>> 'persistAsync' I've now observed my message backing up in three different
>>> queues
>>>
>>> mailbox --> internalStash --> journalBatch
>>>
>>> The attached bitmap shows the chain. Is this a possible but with the
>>> Cassandra journal plug in or maybe an issue with my custom serializers? Any
>>> tips on how to chase this issue?
>>>
>>> -Richard
>>>
>>> On Friday, January 13, 2017 at 12:51:10 AM UTC-8, Richard Ney wrote:
>>>>
>>>> First a thank you to people answering questions.
>>>>
>>>> Against first the environment
>>>>
>>>> Akka 2.4.11
>>>> Scala 2.11.8
>>>> Cassandra 3.9
>>>>
>>>> I've been chasing an issue for days where one of my persistent actors
>>>> appears to stop processing message to the point where the JVM memory grows
>>>> to 8GB before it gets an out of memory crash. Upon analyzing the dump I'm
>>>> finding the internal stash is overflowing with billions of messages
>>>> queued.
>>>> I've done several 'jcmd <pid> Thread.print' commands to try and see if the
>>>> threads are blocking in my code. The dumps are revealing nothing and I
>>>> can't find a single one that's actually executing my code. I examined all
>>>> the akka.actor.default-dispatcher threads which appear to be in a waiting
>>>> state:
>>>>
>>>> "manhattan-akka.actor.default-dispatcher-43" #89 prio=5 os_prio=0
>>>> tid=0x00007fa4f0007800 nid=0x97 waiting on condition [0x00007fa5a435b000]
>>>> java.lang.Thread.State: WAITING (parking)
>>>> at sun.misc.Unsafe.park(Native Method)
>>>> - parking to wait for <0x00000005c0fbdf80> (a
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>> And the Cassandra plug-in threads are also waiting:
>>>>
>>>> "manhattan-cassandra-plugin-default-dispatcher-49" #95 prio=5 os_prio=0
>>>> tid=0x00007fa504049000 nid=0x9d waiting on condition [0x00007fa52f5fa000]
>>>> java.lang.Thread.State: WAITING (parking)
>>>> at sun.misc.Unsafe.park(Native Method)
>>>> - parking to wait for <0x00000005c1b65640> (a
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinPool)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.scan(ForkJoinPool.java:2075)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>> Trying to figure out if this a bug where a journal write is never
>>>> returning which is all I can think of at this time. I've now set a limit
>>>> on
>>>> the internal stash but that will only prevent the OOM exception but my
>>>> actor will still stop processing data.
>>>>
>>>> Any pearls of wisdom are appreciated.
>>>>
>>>> -Richard
>>>>
>>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected]
>> <javascript:>.
>> Visit this group at https://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
>
> Patrik Nordwall
> Akka Tech Lead
> Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM
> Twitter: @patriknw
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.