Hi all, this is my case:

I have a queue (Rabbit MQ) producing Json data with the following format: 

{ "id": 123, "message": "hello" }
{ "id": 876, "message": "shutdown" }
{ "id": 123, "message": "bye" }

And I have a respective Consumer. What I want to do is to distribute the 
load depending on the *id* field. I mean, if the id already appeared before 
I want to respect the order, if not I want to process the event in 
parallel. This would be the Flow:

case class Event(id: Long, message: String)

val queue = Source(rabbitQueueConnection).mapAsync(100)(d => Future(fromJson
(d.message.body.utf8String)))
val processor = Flow[Try[Option[Event]]] filter (_.isSuccess) map (_.get) 
filter (_.isDefined) map (_.get)
val out = Sink.ignore

queue ~> processor ~> out

I've been playing around 
<https://github.com/gvolpe/events-processor-prototype/blob/master/src/main/scala/com/gvolpe/prototypes/processor/flows/ConsumerProcessorFlow.scala>
 
but I don't know how to do this.

Any ideas?

Thanks,
Gabriel.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to