My thought was that maybe you could achieve what you want by composing it
with existing stages, perhaps zipWithIndex+map index to key/ or use
stateFulMapConcat to decide when to change the key, and then groupBy into a
lazy sink. Can't say I'm sure that will solve your problem, but may be
worth exploring.

--
Johan
Akka Team

On Tue, Oct 31, 2017 at 9:49 PM, Jason Steenstra-Pickens <
[email protected]> wrote:

> Hi Johan,
>
> As far as I can tell this only creates a single Sink based on the first
> element. I need a dynamic number of Sinks. In the past I have used a custom
> version of this which creates a new Source for every element (
> OneToOneOnDemandSink <https://stackoverflow.com/a/45874809/367796>).
> Neither of these solve the case above though.
>
> I could create a variation of the OneToOneOnDemandSink called
> ManyToOneOnDemandSink that takes a predicate to determine when to create
> the next Sink however this is still too specialised and we loose the
> composability. For example I would need to create three variations to cover
> the built-in SubFlow API (groupBy, splitAfter, splitWhen) which is doable
> but it is pushing all the logic into the Sink and it doesn't cover any
> custom SubFlows.
>
>
> Cheers,
> Jason
>
> On Tuesday, 31 October 2017 22:33:06 UTC+13, Akka Team wrote:
>>
>> Check if sink.lazySink.lazyInit() doesn't do what you want.
>>
>> --
>> Johan
>> Akka Team
>>
>> On Tue, Oct 31, 2017 at 12:59 AM, Jason Steenstra-Pickens <
>> [email protected]> wrote:
>>
>>> Hi,
>>>
>>> I seem to be encountering a reoccurring problem when using Akka Streams
>>> and haven't found the right client API to use to solve it.
>>>
>>> The problem usually translates into:
>>>
>>>    - I have some possibly infinite Source
>>>    - I want to split it into multiple inner Sources based on some
>>>    condition such as delimiter, count, or whatever
>>>    - I then want to create a Sink for each inner Source dynamically and
>>>    run each inner Flow
>>>    - I want the backpressure, errors, completion, cancellation and
>>>    stuff like that to be shared between the outer Flow and the inner Flow
>>>
>>> There are a few things come close but all seem to be for a slightly
>>> different use case, such as:
>>>
>>>    - splitAt / splitWhen
>>>    - lazy / lazyInit
>>>    - the various hubs
>>>
>>> Here is a concrete example:
>>>
>>>    1. Reading a file in 8KB parts
>>>    2. Splitting the first 625 parts into a separate stream as a "chunk"
>>>    3. Create a HTTP source that has a URL containing the chunk number
>>>    4. Send the 625 parts to that source
>>>    5. Take the next chunk from step 2
>>>
>>> An attempt using a SubFlow looks like:
>>>     val chunkSize = fileResponse.chunkSize
>>>     val partsPerChunk = chunkSize / partSize
>>>     val counts = Source {
>>>       for {
>>>         chunk <- 1 to Int.MaxValue
>>>         part <- 1 to partsPerChunk
>>>       } yield (chunk, part)
>>>     }
>>>     val source = FileIO.fromPath(filePath, partSize)
>>>       .zip(counts)
>>>       .splitAfter({ next =>
>>>         val (_, (_, part)) = next
>>>         part == partsPerChunk
>>>       })
>>>
>>> This is quite nice but then there doesn't seem to be a way of getting to
>>> the inner Flow even if I were to create a custom Sink.
>>>
>>> It would be really awesome if SubFlow had a function like:
>>>   def to[M](f: Out => Graph[SinkShape[Out], M]): C
>>> (although probably without the materialised value since there would be
>>> multiple).
>>>
>>> Is there something obvious that I am missing?
>>>
>>>
>>> Cheers,
>>> Jason
>>>
>>> --
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>>> urrent/additional/faq.html
>>> >>>>>>>>>> Search the archives: https://groups.google.com/grou
>>> p/akka-user
>>> ---
>>> You received this message because you are subscribed to the Google
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at https://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>
>> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to