Hi,
I would like to have a confirmation about the usage of PassThroughFlow
<https://github.com/akka/akka-stream-contrib/blob/master/src/main/scala/akka/stream/contrib/PassThroughFlow.scala>
.
Reading the documentation, the following quotes make me think that the
usage I'm doing is wrong :
*This flow combinator is guaranteed to work correctly on flows that have
behavior of classic total functions, meaning that they should be a
one-t-one functions that don't reorder, drop, inject etc new elements.*
I'm using in the following way :
val innerFlow =
Flow
.map { msg =>
someIO(msg) that returns a list
}
.mapConcat(identity)
.via(someLogic1)
.via(someLogic2)
Consumer
.committableSource(settings, Subscriptions.topics("my-topic"))
.via(PassThroughFlow(innerFlow))
.map { case (committableMessage, processingResult) => ??? }
The innerFlow takes one message from Kafka and may returns more than 1
output so the function is not 1:1 but 1:N. So for me it's the wrong way to
use this PassThroughFlow right or maybe there is something that I didn't
understand ?
By the way, the general use case that I'm trying to solve :
- For each message from Kafka, one flow is doing some IO that returns a
List of N elements and returns a [CommitableOffset, List[T]]
- I would like to use mapConcat but since I need to commit at the end of
the graph I cannot do it because the CommitableOffset will the the same for
elements of the list so I'm going to commit the same offsets multiple
times.
This is why I tried to use PassThroughFlow, to wrap the logic into a
innerFlow that is doing this job without taking care of commits.
--
*****************************************************************************************************
** New discussion forum: https://discuss.akka.io/ replacing akka-user
google-group soon.
** This group will soon be put into read-only mode, and replaced by
discuss.akka.io
** More details: https://akka.io/blog/news/2018/03/13/discuss.akka.io-announced
*****************************************************************************************************
>>>>>>>>>>
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To view this discussion on the web visit
https://groups.google.com/d/msgid/akka-user/94c9771d-42ad-4650-ae91-2a5586391df1n%40googlegroups.com.