I think what you are looking for is a Source that defines your queries, and
a `mapAsync (3){/* perform query */}` as that executes the queries in
parallel, and returns the results in order.

If the result of the mapAsync is collections, you can flatMap to unpack
them. If the result is Streams, you can flatMapConcat instead.

On Apr 16, 2017 11:20 AM, "Ali Hubail" <[email protected]> wrote:

Hello All,



I’m reading time series data from a database (Cassandra), but those
datapoints are partitions across different places. So, for example, if
there is a 1 Hz data, every 3600 datapoint (for one hour) is stored in a
different partition. When I read those datapoints, I have to return them to
the user in the correct logical order. So, I have to return the 3600
datapoints for hour 00:00 and then 3600 datapoints for hour 01:00, … until
hour 23:00.

The basic way of doing it is something like this



val sources = buckets.map(bucket => {

            val future: Future[List[Any]] = session.execute(....)

            Source.fromFuture(future).mapConcat(identity)

          }





val concat = builder.add(Concat[Any](sources.size))



sources.foreach(s => {

            ...

            s ~>... ~> concat

          })





 concat ~> sink



Naturally, there is a delay in database queries. Let’s say it’s 10 ms for
each query. That means that if I do concat, it would have a delay of 24 *
10 ms = 240 ms. What I need is a hot concat with a number of parallelism. So,
currently I buffer all sources like this:

sources.foreach(s => {

            ...

            s ~>
builder.add(Flow[Any].buffer(100000,OverflowStrategy.backpressure))
~> concat

          })



But the problem as you can see is that all of those queries are going to be
executed at the same time. I need to limit the number of queries executed
at any given time.

Let’s say that the limit is 3

T0:

Source_0 executing query

Source_1 executing query

Source_2 executing query

Source_3 ideal

Source_4 ideal

…

..

Source_n ideal



T1:

Source_0 finished streaming

Source_1 executing query

Source_2 executing query

Source_3 executing query

Source_4 ideal

…

..

Source_n ideal





T2:

Source_0 finished streaming

Source_1 finished streaming

Source_2 executing query

Source_3 executing query

Source_4 executing query

…

..

Source_n ideal





Any suggestion on making concat prefetch sources with a moving window?



-- 
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups
"Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an
email to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to