Oops, my bad. The groupWithin on the SubFlow does seem to work as expected.

Comments on the overall approach still welcomed!

There' an additional wrinkle in that I need to transform the channel id
before the write call - if I do that with a lookup, then this groupedWithin
helps because I don't have to do the lookup for each interval in the
stream, just for each batch

On the other hand, if I *start* with channels rather than intervals, I
don't need to do the lookup at all because the channel stream contains both
forms of id, but then I need to figure out how to build the streams. Take
the stream of channels from Slick and do a flatMapMerge to form a stream of
intervals? Limited concurrency in that approach, no?

On Wed, Jan 20, 2016 at 1:27 PM, Richard Rodseth <[email protected]> wrote:

> I would love to get some guidance on my first akka-streams project. To
> recap/expand, I need to copy interval data of form (channel id, start, end,
> value) from a relational db (using Slick) to Cassandra.
>
> The API to store in Cassandra takes a channel id and a batch of intervals
> for that channel. So I need to assemble batches of interval data on a
> per-channel basis.
>
> I figured that I need to a) ingest channel by channel, or b) group the
> interval stream by channel using groupBy
> If I use groupBy(_.channelId) I get a SubFlow.
> If I call groupWithin(5, 10 seconds)  on that SubFlow it doesn't seem to
> affect the substreams separately - the output is a stream of Vectors of
> length 5 but each one contains Intervals from multiple different channels.
>
>  Any ideas? Thanks.
>
>
> On Wed, Jan 6, 2016 at 3:56 PM, Richard Rodseth <[email protected]>
> wrote:
>
>> I'm considering using Slick and Akka Streams for an ETL project.
>>
>> It's basically moving intervals1 to intervals2, but intervals have a
>> channel id and some of the channel info needs to be looked up and included
>> in intervals2.
>>
>> I suppose I could do a map on Source(intervals1) and cache the looked up
>> channel info.
>>
>> But I thought I could avoid the lookup and cache if I start with the
>> stream of channels and groupBy channel, then use mapAsync. Like the
>> GroupLogFile example in akka-stream-scala.
>>
>> I also came across this example which uses a Merge, but it's just a 2-way
>> merge, whereas I have n channels.
>>
>>
>> https://github.com/typesafehub/slick-akka-webinar/blob/master/src/main/scala/EtlDemo.scala
>>
>> Any recommendations or similar ETL examples would be welcome.
>>
>> Thanks.
>>
>>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to