Hi,

I'm currently experimenting with elastic4s reactive streams extension 
<https://github.com/sksamuel/elastic4s#elastic-reactive-streams> and akka 
streams. What I want to do:

(1) List[Polygon] ~> (2) elastic4s publisher ~> (3) List[List[Trajectory]] 
~> (4) algorithm


   1. I have an initial, fixed list of polygons
   2. For each polygon a geoshape query is performed which results in a 
   List[Trajectory]
   3. I get a list of lists, because each polygon produces a list
   4. other stuff, which is already working


My problem is that I don't know how to create a flow stage from a publisher 
as the elastic4s API gives you a publisher
for queries. I can create a single publisher which returns the SearchHits 
for one Polygon.

def source(polygon: Polygon[LngLat])(implicit system: ActorSystem): 
Source[RichSearchHit, Unit] = {
  val publisher = client.publisher(geoShapeQuery(polygon))
  Source(publisher)
}


Ordering is needed, however I could solve this with indices. I started with 
this and wonder if this is the way to go, creating a new source for each 
element:

val searchHits: Source[Seq[RichSearchHit], Unit] = 
Source(List.empty[Polygon[LngLat]])
      .mapAsync(parallelism = 1) { polygon =>
        val sink = Sink.fold[Seq[RichSearchHit], 
RichSearchHit](Seq.empty[RichSearchHit])(_ :+ _)
        source(polygon).runWith(sink)
      }


thanks in advance,
Muki

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to