Hi All,

Thanks already for looking at my question, I really appreciate it.

@Gavin, the idea is that I'd like to use akka for 2 reasons:
1. I think that using akka will be an enabler for us to build much more 
reactive application than what's being currently done (so we can expand 
akka usage in the future)
2. All components I use so far looks really simple, and beside that load 
issue that I'm having, it just work great.
So using an EBS isn't something we'd like to do, the idea is being able to 
switch to a full akka world at some point in time (I've had a dream...)

@Victor, yes all this work is parallelizable, and running those heavy 
computation methods on multiple threads / nodes is the final goal.

@Guido, it's true that I didn't provide much details about how the original 
application works, but it's a million lines of code type of app (a big 
monolith), so explaining it all would be just too long ;)
Said that, let me try to give a bit more background, and a few updates 
since I've posted.

The original application is running in a war under tomcat7.

It's using spring integration to receive the JMS messages, and the new 
solution I'm putting in place needs to be turned on and off based on some 
configuration, so we need to be able to switch from current behaviour to 
new behaviour and vice versa easily.
That's the reason we don't want to change all the integration with spring 
to receive the JMS message, but instead call the queueing mechanism from 
the JMS message listener class that already exist (which will detect which 
behaviour to run).
Note that we don't want to change all the JMS message is received as it's 
perceived as a too big of a change for the first version of the change.
In the future we will probably change that layer to have the JMS message 
somehow handled in an asynchronous way, but this is out of scope for now.
The JMS provider is ActiveMQ, and we're using other technologies like 
hazelcast, jboss jbpm.



On the other hand I've been playing more with different configuration 
settings, and maybe I was in fact using something wrong !

One thing I didn't say in my original post is that I'm using 
*ClusterSharding* to distribute the load across a cluster of nodes.
And when using *ClusterSharding*, I've had to use mongo DB for the 
persistence storage (our IT didn't give me a choice).

The current storage engine I'm using is *scullxbones/akka-persistence-mongo*, 
and based on the documentation I was trying to setup the dispatcher based 
on the example:
https://github.com/scullxbones/akka-persistence-mongo/blob/master/docs/akka24.md

akka-contrib-persistence-dispatcher.thread-pool-executor.core-pool-size-min = 10
akka-contrib-persistence-dispatcher.thread-pool-executor.core-pool-size-factor 
= 10
akka-contrib-persistence-dispatcher.thread-pool-executor.core-pool-size-max = 20


Now when I look at the thread usages using jvisualvm, I can see that the 
most active threads are the ones for the 
*system-akka-contrib-persistence-dispatcher* threadpool.
When I've removed those settings that were provided as an example from the 
doc above, then I stopped getting the timeouts.

I guess this could have been part of the problem.

I'm still getting kind of longer than planned time to queue my messages 
(considered that it just receives a message and does an insert into a mongo 
collection).
Some messages are getting queued in 1ms, while some others take about 2.5s 
to be queued (the average is 325ms).
But it's not easy to see why it takes that long, and where the time is 
spent (using jvisualvm gives some clue, but it's far from being perfect).

Another thing I've realised is that when I tried to use the 
*thread-pool-executor* configuration, I did use the wrong keys in the 
settings :facepalm:
I was using the keys from the *fork-join-executor* ... I know... it was 
late and I was probably just getting blind.

So all in all, I'm getting much better performance than yesterday already, 
but I'm still not at the level I'd like to be.
So somehow I'd like to prioritise work of the MessageConsumerActor above 
execution of the MessageExecutorActor.

And using different dispatchers, I was hoping (and still hope) that I'll be 
able to do it nicely.

I'll try a few more experiments, and send some updates.

Thanks a lot for all your messages, this give me great insights on possible 
next moves to improve my message handling.


On Monday, June 27, 2016 at 5:24:54 PM UTC+2, benoit heinrich wrote:
>
> Hi all,
>
> I've been playing with akka a few years ago, and I'm just back to it, 
> trying to implement some kind of internal queueing mechanism on top of a 
> very big java8 application which perform a lot of message processing.
>
>
> *Background:*
>
> The current application is a message processing application which receive 
> JMS messages (using AMQ) and that need to process them.
> The application has a notion of a subscriber for a given JMS message type, 
> and when a message is received, all the subscribers which are interested in 
> that message type are executed one after the other.
> The problem is that some of those subscribers require some kind of 
> pessimistic locks on some resources, and that causes a lot of threads to 
> just wait for the resource to be acquired before anything else happen.
>
>
> *Solution:*
>
> The change I've made (which uses actor system) is that when the JMS 
> message is received, it sends the message to an actor (a 
> MessageConsumerActor) based on the resource that needs to be locked, and 
> then the actor sends the work to a child (a MessageExecutorActor) one at 
> a time to lock the resource, and execute the actual subscriber code.
> The MessageConsumerActor is then managing the queue of messages to be 
> processed, and given work to the MessageExecutorActor one at a time, and 
> the MessageExecutorActor is in charge to acquire the lock.
> Because the MessageExecutorActor / MessageConsumerActor actor couple is 
> unique for that specific locked resource, no other processes try to lock 
> that resource, and then acquiring the lock is very quick as no other 
> threads try to acquire that lock a the same time.  With this the messages 
> get executed nicely from the queue, one after the other.
>
> The class which receive the message from JMS, is something which 
> unfortunately doesn't support asynchronous calls (it needs to fulfill an 
> existing API).
> The API consider that once the method returns, then the JMS message is 
> marked as processed, and if it throws an exception, then the transaction 
> which is around the JMS message delivery is failed.
> In order to make sure that I don't lose messages, when a JMS message is 
> received, I return only when I get confirmation that the message have been 
> queued by the MessageConsumerActor.
> Unfortunately, the only way I've found to achieve this is to use 
> Await.result() method.
>
>
> *Problem:*
>
> All this works greatly until I've got too many messages to process, at 
> that time I receive timeouts when calling the Await.result() from the 
> MessageConsumerActor, even though that actor should be very fast to 
> acknowledge the result.
>
> I think the problem might be due to those subscribers which are in general 
> quite slow (from a few seconds to sometime a minute) to execute.
> When a message is received on the MessageExecutorActor, it then calls the 
> subscriber call in the same thread, and so that thread is getting busy with 
> some very complex computation for a few seconds.
> When there is a lot of messages received, then I've got the feeling that 
> all the threads get busy running into those subscribers, and if another JMS 
> message is received during that time, then I get a timeout due to the 
> MessageConsumerActor not replying quick enough.
>
>
> *What I tried:*
>
> I've been googling on this, and I've watched a few (very interesting) 
> video including the 
> http://boldradius.com/blog-post/U-jexSsAACwA_8nr/dos-and-donts-when-deploying-akka-in-production
>  
> .
> What I get from this is that already, I should never call the  
> Await.result() method, but then how can I get the feedback that my 
> message has been queued properly from the JMS thread (thread which has a 
> transaction context associated to it)?
>
> Then the next thing I tried is to use a different dispatcher for the 
> MessageExecutorActor, and another one for the MessageConsumerActor.
> I was hoping this would just work by magic, but unfortunately it didn't ;)
>
> The way I've used it is by adding two new dispatcher configuration in my 
> application.conf and then referencing them in the props when creating my 
> actors.
> I could see using JVisualVM (and watching logs) that the threads that now 
> performed my application were named by the name of the dispatchers I gave 
> in the configuration.
>
> I've tried lot of different configurations, but none worked, and I'm 
> starting to desperate here.
>
> Here is the last configuration I tried:
>
> # Dispatcher used by the ActorSystemPublisher and MessageConsumerActor to 
> allow messages to be queued
> queuing-dispatcher {
>   type = Dispatcher
>   executor = "fork-join-executor"
>   fork-join-executor {
>     parallelism-min = 1
>     parallelism-max = 1
>   }
>   # Because a single message in general doesn't have more much subscribers, a 
> value of 20 should be enough
>   throughput = 1
> }
>
> # Dispatcher used by the MessageExecutorActor to execute messages by the 
> subscribers
> executing-dispatcher {
>   type = Dispatcher
>   executor = "thread-pool-executor"
>   thread-pool-executor {
>     parallelism-min = 1
>     parallelism-max = 4
>   }
>   # Because a single locked resource could have lot of messages to be 
> processed, and because processing a message
>   # could be time consuming, we want to allow as much fairness for each 
> resource to be executed.
>   throughput = 1
> }
>
>
> The queuing-dispatcher is used by the MessageConsumerActor, and the 
> executing-dispatcher is used by the MessageExecutorActor.
>
>
> As you can see I'm trying to use fork-join-executor and 
> thread-pool-executor combinations, and all the possible variations, but 
> none worked.
> I tried different parallelism, but none work neither.
> I tried to force a single thread for the MessageExecutorActor using 
> parallelism-max=1 but that didn't work neither :(
>
> Could someone please let me know how this kind of issue is being solved?
> Am I on the right track with the dispatcher configuration?
> Do I need to use some kind of routers, and if so, how?
>
>
> Thanks in advance for all the suggestions :)
>
> Cheers,
> /Benoit
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to