Hi,
I'm playing around with akka-http and elasti4s reactive extension. My goal
is rather simple: I want to stream the elasticsearch results via websocket
on request
1. Open a websocket ( *wscat -c ws://localhost:9000/es/scroll* )
2. Place a query ( *>search:my search term* )
3. Get always a bunch (e.g. 5) results and get the next results with
*>next*
val commandTriggeredFlow = Flow() { implicit b =>
import FlowGraph.Implicits._
// broadcast the command to the elasticsearch source and the tick
system
val bcast = b.add(Broadcast[Command](2))
val zip = b.add(Zip[Command, Seq[Question]]())
// forward the ticks
bcast.out(0) ~> zip.in0
// filter out the Next commands
bcast.out(1)
.filter(_.isInstanceOf[Search])
.map {
case Search(term) => query(term)
}
.flatten(FlattenStrategy.concat)
// group results to get a "per next result batch"
.grouped(2) ~> zip.in1
(bcast.in, zip.out)
}
I had the idea to parse the commands from the websocket and only create a
new search on a search command and otherwise trigger the elasticsearch
ScrollPublisher source.
Well. That doesn't work :D
> search:first
< (Search(first),Vector(Question(1,First Question ever asked!,This is my
first question!), Question(1,First Question ever asked!,This is my first
question!)))
> next
<
>
I'm getting the results in a batch for my first search. But when I hit next
the stream seems not be triggered.
I couldn't find any examples for this :(
cheers,
Muki
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.