Andrew-

There should be no problem using mapAsync in the handler since mapped 
elements will be emitted in order and, thus, joined with the proper message 
for the ack sink.
Quote from the docs: "These Futures may complete in any order, but the 
elements that are emitted downstream are in the same order as received from 
upstream."

This will allow you to parallelize any work on the messages with no change 
to the solution provided (the mapAsync is encapsulated in the Handler).

If, on the other hand, multiple logical messages are encapsulated in one 
physical message, the solution would need to be slightly different. 
However, once again, that's easily encapsulated in the handler.
For example, taking a single message and invoking several REST services in 
parallel and incorporating their responses would just be a broadcast/zip 
(or similar) flow embedded in the handler.

The point is that whatever handling is needed for each individual message 
should be fully encapsulated in the Handler, providing a clean separation 
of concerns.
Of course, you could parallelize acknowledgement as well, but that's a 
slightly different design and probably overkill, IMHO. KISS unless there's 
a genuine need to do otherwise.

Hope that helps. Feel free to sling additional questions.

Regards,
Lance

On Friday, August 21, 2015 at 12:38:58 AM UTC-4, Andrew Rollins wrote:
>
> Lance,
>
> I really appreciate your response, it's well thought out, and the code is 
> a great illustration. I have one question, I'm guessing you probably 
> already considered it so I'd like to hear your thoughts.
>
> Using your approach, what's the best way to parallelize work that the 
> handler does? Is the intention to launch several separate handleAndAckSinks 
> to achieve parallelism, or something else?
>
> I ask because the handler flow has to output one element per message in 
> order to keep the message and the Try in lock step. That means the handler 
> flow can't do something like break down the work into a bunch of smaller 
> messages, flatten that into the stream, and utilize mapAsync in that 
> flattened stream to do a bunch of parallel work (however, the handler could 
> have many steps which provide some parallelism through pipelining). I'm 
> just not familiar enough with Akka Streams to intuite the best approach for 
> parallelize the handler.
>
> Thanks,
> Andrew
>
> On Thursday, August 20, 2015 at 12:31:50 PM UTC-4, Lance Arlaus wrote:
>>
>> Andrew-
>>
>> How about using a simple branching flow with a broadcast and zip?
>>
>> The first branch carries the message to the end of the pipeline where the 
>> acknowledger receives both the message and the result of processing as a 
>> Tuple. It can then decide whether/how to acknowledge the message.
>> The second branch contains the desired business logic and produces a Try 
>> (or other monadic data type) as its result.
>> This way, the processing logic has no need to pass along the message. In 
>> fact, you could extract and deal with just the payload itself in that 
>> second branch if you'd like.
>> Since the zip waits for elements from both branches before emitting, 
>> you'll have matched (Message, Try) tuples.
>>
>> Here's a gist: 
>> https://gist.github.com/lancearlaus/e6e52fc8c7ca534cb026#file-akka-user-stream-ack-scala
>>
>> And the relevant code (the handleAndAckSink method is the key):
>>
>> case class Message[T](id: Long, body: T)
>>
>> trait Queue {
>>   def acknowledge(id: Long): Unit
>> }
>>
>>
>> type Handler[T] = Flow[Message[T], Try[_], _]
>> type AckSink = Sink[(Message[_], Try[_]), Future[_]]
>>
>> // A sink that acknowledges messages upon successful processing
>> def ackSink(queue: Queue) =
>>   Sink.foreach[(Message[_], Try[_])] {
>>     case (msg, result) => result match {
>>       case Success(_) => queue.acknowledge(msg.id)
>>       case Failure(t) => {
>>         // Do something on failure
>>         println(t)
>>       }
>>     }
>>   }
>>
>> // The flow that wraps the handler and acknowledger sink
>> def handleAndAckSink[T](handler: Handler[T], ackSink: AckSink) = 
>>   Sink(handler, ackSink, Broadcast[Message[T]](2), Zip[Message[T], 
>> Try[_]])((_, mat, _, _) => mat) {
>>     implicit b => (handler, ackSink, bcast, zip) =>
>>
>>     bcast            ~> zip.in0
>>     bcast ~> handler ~> zip.in1
>>                         zip.out ~> ackSink
>>
>>     (bcast.in)
>>   }
>>
>>
>> class AckSpec extends FlatSpec with AkkaStreamsImplicits with Matchers 
>> with ScalaFutures {
>>
>>   def testSource(n: Int) = Source((0 to n)).map(n => Message(n, s"message 
>> $n"))
>>
>>   val testQueue = new Queue {
>>     def acknowledge(id: Long) = println(s"acknowledging message $id")
>>   }
>>  
>>   val testHandler = Flow[Message[String]].map { msg => 
>>     // Randomly fail
>>     if (Random.nextBoolean) Failure(new Exception(s"failure processing 
>> message $msg"))
>>     else Success(s"success processing message $msg")
>>   }
>>
>>   "Acknowledge" should "ack messages" in {
>>
>>     val future = testSource(10).runWith(handleAndAckSink(testHandler, 
>> ackSink(testQueue)))
>>
>>     whenReady(future) { result =>
>>     }
>>
>>   }
>>
>> }
>>
>>
>>
>> Regards,
>> Lance
>>
>>
>> On Thursday, August 20, 2015 at 12:20:28 AM UTC-4, Andrew Rollins wrote:
>>>
>>> Is there an idiomatic way handle queues with Akka Streas that need to 
>>> acknowledge messages *after work is done* for a given message?
>>>
>>> This started from a thread on Twitter with Victor (
>>> https://twitter.com/viktorklang/status/634117117978107904), but it's 
>>> more appropriate to continue here. His last comment was "Sounds like you 
>>> want Flow[T, Ack[T]] such that you can close the loop at the end."
>>>
>>> I'm going to show my interpretation of that suggestion and voice my 
>>> concern with it. I'd love feedback.
>>>
>>> Assume we have an external queue service that provides messages through 
>>> a "getMessage" API. Messages can be acknowledged by calling 
>>> "ack(messageId)". After acknowledgement, a message is taken off the queue 
>>> and won't be delivered again.
>>>
>>> I'm not exactly sure how a Flow[T, Ack[T]] helps, because how is the Ack 
>>> being created from an arbitrary T? We need the original message identifier 
>>> to be passed through the stream such that we can acknowledge the message, 
>>> so we would need a flow along the lines of Flow[Msg, Ack]. In code, it 
>>> could look something like this:
>>>
>>>   trait Msg { def msgId } // incoming queue message
>>>   trait Ack               // ack result type
>>>
>>>   class FakeQueue {
>>>     def receive : Msg = ???
>>>     def ack(m: Msg) : Ack = ???
>>>   }
>>>
>>>   val queue = new FakeQueue
>>>
>>>   val msgSource: Source[Msg] =
>>>     Source.apply(() => Iterator.iterate[Msg](None)(_ => queue.receive))
>>>
>>>   val flow: Flow[Msg, Ack, Unit] = 
>>>     Flow[(Msg, String)].
>>>       map {msg => (deserialize(msg.body), msg) }.
>>>       map {case (x, msg) => (doWork(x), msg)}.
>>>       map {case (y, msg) => insertIntoDatabase(y); msg}.
>>>       map {msg => queue.ack(Msg)}
>>>
>>> This seems ok and certainly works, but I have a hangup with this. All my 
>>> intermediate steps need to passthrough the message to the end, but they 
>>> individually don't care about the message. Those stages are coupled with 
>>> some data they don't ultimately handle. I'd like to avoid that.
>>>
>>> In other words, in some hand-wavy sense, there is a desire to take a 
>>> Flow such as Flow[In, Out] which is queue agnostic and then wrap that flow 
>>> with something that will dequeue messages from a queue, push the "In" 
>>> object to the interflow, and somehow pass along the outer message such that 
>>> the message tied to an "In" pops out the other end with the associated 
>>> "Out".
>>>
>>> I'm at a loss for how to do this. Perhaps I'm looking at it wrong to 
>>> begin with. I'm hoping someone else can provide guidance.
>>>
>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to