Hi all, this is my case:
I have a queue (Rabbit MQ) producing Json data with the following format:
{ "id": 123, "message": "hello" }
{ "id": 876, "message": "shutdown" }
{ "id": 123, "message": "bye" }
And I have a respective Consumer. What I want to do is to distribute the
load depending on the *id* field. I mean, if the id already appeared before
I want to respect the order, if not I want to process the event in
parallel. This would be the Flow:
case class Event(id: Long, message: String)
val queue = Source(rabbitQueueConnection).mapAsync(100)(d => Future(fromJson
(d.message.body.utf8String)))
val processor = Flow[Try[Option[Event]]] filter (_.isSuccess) map (_.get)
filter (_.isDefined) map (_.get)
val out = Sink.ignore
queue ~> processor ~> out
I've been playing around
<https://github.com/gvolpe/events-processor-prototype/blob/master/src/main/scala/com/gvolpe/prototypes/processor/flows/ConsumerProcessorFlow.scala>
but I don't know how to do this.
Any ideas?
Thanks,
Gabriel.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.