Hi Patrik,
Yes! That worked. *flatten* was the right hint to fight the right place in
the API ;)
Now it looks like this
boxes.map { query =>
source(query.polygon).fold(TrajectoryBox(Nil, query)) {
case (box, hit) => box :+ hit.as[Trajectory]
}
}
.flatten(FlattenStrategy.concat)
.fold[TrajectoryBoxes](TrajectoryBoxes(Nil))(_ :+ _)
cheers,
Muki
Am Dienstag, 22. September 2015 11:41:50 UTC+2 schrieb Patrik Nordwall:
>
> Have you looked at flatten (concat)?
> /Patrik
>
> On Sun, Sep 20, 2015 at 2:57 PM, Muki <[email protected] <javascript:>>
> wrote:
>
>> Hi,
>>
>> I'm currently experimenting with elastic4s reactive streams extension
>> <https://github.com/sksamuel/elastic4s#elastic-reactive-streams> and
>> akka streams. What I want to do:
>>
>> (1) List[Polygon] ~> (2) elastic4s publisher ~> (3) List[List[Trajectory
>> ]] ~> (4) algorithm
>>
>>
>> 1. I have an initial, fixed list of polygons
>> 2. For each polygon a geoshape query is performed which results in a
>> List[Trajectory]
>> 3. I get a list of lists, because each polygon produces a list
>> 4. other stuff, which is already working
>>
>>
>> My problem is that I don't know how to create a flow stage from a
>> publisher as the elastic4s API gives you a publisher
>> for queries. I can create a single publisher which returns the SearchHits
>> for one Polygon.
>>
>> def source(polygon: Polygon[LngLat])(implicit system: ActorSystem):
>> Source[RichSearchHit, Unit] = {
>> val publisher = client.publisher(geoShapeQuery(polygon))
>> Source(publisher)
>> }
>>
>>
>> Ordering is needed, however I could solve this with indices. I started
>> with this and wonder if this is the way to go, creating a new source for
>> each element:
>>
>> val searchHits: Source[Seq[RichSearchHit], Unit] =
>> Source(List.empty[Polygon[LngLat]])
>> .mapAsync(parallelism = 1) { polygon =>
>> val sink = Sink.fold[Seq[RichSearchHit],
>> RichSearchHit](Seq.empty[RichSearchHit])(_ :+ _)
>> source(polygon).runWith(sink)
>> }
>>
>>
>> thanks in advance,
>> Muki
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ:
>> http://doc.akka.io/docs/akka/current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to [email protected] <javascript:>.
>> To post to this group, send email to [email protected]
>> <javascript:>.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>
>
> --
>
> Patrik Nordwall
> Typesafe <http://typesafe.com/> - Reactive apps on the JVM
> Twitter: @patriknw
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.