I'm using Akka-Http 1.0-RC2 and I'm building a simple streaming server
(Chunked HTTP) and client using reactive streams / flow graphs.
My server looks like this (simplified version):
object Server extends App {
implicit val system = ActorSystem("Server")
implicit val ec = system.dispatcher
val (address, port) = ("127.0.0.1", 6000)
implicit val materializer =
ActorFlowMaterializer(ActorFlowMaterializerSettings(system))
val publisher = Source.actorPublisher(Props(new MyAwesomePublisher))
val handler = Sink.foreach[Http.IncomingConnection] { con =>
con handleWith Flow[HttpRequest].map { req =>
HttpResponse(200).withEntity(Chunked(`application/json`,
publisher))
}
}
(Http() bind (address, port) to handler run)
}
I can now consume this stream with my akka http client implementation and
'slow down the stream' by applying backpressure. I deliberately slow down
my client side processing to trigger the backpressuring. Here's a
simplified version:
class Client(processor: ActorRef) extends Actor {
private implicit val executionContext = context.system.dispatcher
private implicit val flowMaterializer: FlowMaterializer =
ActorFlowMaterializer(ActorFlowMaterializerSettings(context.system))
val client =
Http(context.system).outgoingConnection(host, port, settings =
ClientConnectionSettings(context.system))
val decompress = Flow[ByteString].map {
data => gunzip(data.toArray)
}
val buff = Flow[ByteString].buffer(1000, OverflowStrategy.backpressure)
val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20 millis,
context.system.scheduler)(Future.successful(x)) }
val consumer = Flow[HttpResponse].map {
data =>
FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
data.entity.dataBytes ~> slowFlow ~> buff ~> Sink.ignore
}.run()
}
override def receive: Receive = {
case query: String =>
val req = HttpRequest(GET, "http://localhost:6000/api")
.withHeaders(
Connection("Keep-Alive")
)
Source.single(req).via(client).via(consumer).to(Sink.onComplete {
case Success(_) => println("Success!")
case Failure(e) => println(s"Error: $e")
}).run()
}
Because of 'slowFlow', I can see that my server 'slows down the stream'
(i.e. less throughput for this connected client). So, great!
However, I wanted to handle the flow processing in another Actor, so I used
ActorPublisher and pipe the stream to it, using akka.pattern.pipe:
class Client(processor: ActorRef) extends Actor {
...
override def receive: Receive = {
case query: String =>
val req = HttpRequest(GET, endpoint)
.withHeaders(
`Accept-Encoding`(gzip),
Connection("Keep-Alive")
) ~> authorize
Source.single(req).via(client).runWith(Sink.head) pipeTo self
case response: HttpResponse =>
response.entity.dataBytes.map { dataByte =>
processor ! dataByte
}.to(Sink.ignore).run()
}
}
class StreamProcessor extends ActorPublisher[ByteString] with Actor {
override def receive: Actor.Receive = {
case data: ByteString =>
if (isActive && totalDemand > 0)
onNext(data)
}
}
...
// elsewhere I'm consuming this publisher
val src = Source(ActorPublisher[ByteString](streamProcessor))
FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._
val decompress = Flow[ByteString].map {
data => gunzip(data.toArray)
}
val buff = Flow[ByteString].buffer(1000, OverflowStrategy.backpressure)
val slowFlow = Flow[ByteString].mapAsync(1) { x => after(20 millis,
context.system.scheduler)(Future.successful(x)) }
src ~> slowFlow ~> buff ~> Sink.ignore
}.run()
This works fine, however in StreamProcessor (the ActorPublisher) it seems
if I'm getting more data then I demand the only thing I can do is drop the
messages. Can I apply backpressure here to the sender / upstream?
Thnx for any pointers!
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.