Hi,
I'm currently experimenting with elastic4s reactive streams extension
<https://github.com/sksamuel/elastic4s#elastic-reactive-streams> and akka
streams. What I want to do:
(1) List[Polygon] ~> (2) elastic4s publisher ~> (3) List[List[Trajectory]]
~> (4) algorithm
1. I have an initial, fixed list of polygons
2. For each polygon a geoshape query is performed which results in a
List[Trajectory]
3. I get a list of lists, because each polygon produces a list
4. other stuff, which is already working
My problem is that I don't know how to create a flow stage from a publisher
as the elastic4s API gives you a publisher
for queries. I can create a single publisher which returns the SearchHits
for one Polygon.
def source(polygon: Polygon[LngLat])(implicit system: ActorSystem):
Source[RichSearchHit, Unit] = {
val publisher = client.publisher(geoShapeQuery(polygon))
Source(publisher)
}
Ordering is needed, however I could solve this with indices. I started with
this and wonder if this is the way to go, creating a new source for each
element:
val searchHits: Source[Seq[RichSearchHit], Unit] =
Source(List.empty[Polygon[LngLat]])
.mapAsync(parallelism = 1) { polygon =>
val sink = Sink.fold[Seq[RichSearchHit],
RichSearchHit](Seq.empty[RichSearchHit])(_ :+ _)
source(polygon).runWith(sink)
}
thanks in advance,
Muki
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.