Hi all,

I'm running into an EntityStreamSizeException when streaming data from a 
streaming response I got by calling another endpoint.. It is a little bit 
like presented in the talk by Mathias & Johannes at scalaworld: 
https://www.youtube.com/watch?v=6VBn9V3S2aQ

I'm using with akka-http 2.0-M2 and created my problem in isolation. See 
the route (and link to full gist below). When I call `curl -i 
http://localhost:8080/endless` the stream will continue indefinitely. 
However, when I call `curl -i http://localhost:8080/pipe` it takes a few 
seconds to get "curl: (56) Recv failure: Connection reset by peer" on the 
client an the exception below on the server. The source below is just an 
example to isolate the problem.

Am I doing something wrong? I would expect an endless stream and no limit. 
I'm using Chunked as stated 
in 
http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/common/http-model.html#HttpEntity

Thanks!
Jeroen

val source: Source[Int, Unit] = Source(Stream.from(1))

val route = (path("endless") & get) {
  complete {
    HttpResponse(
      entity = HttpEntity.Chunked(
        MediaTypes.`text/plain`,
        source.map(nr ⇒ ByteString((nr.toString * 10) + "\n", "UTF-8"))
      )
    )
  }
} ~
  (path("pipe") & get) {
    val s = Http().singleRequest(HttpRequest(uri = 
"http://localhost:8080/endless";)).map {
      _.entity.dataBytes
        .via(Framing.delimiter(ByteString("\n"),
          maximumFrameLength = 10000, allowTruncation = true))
        .map(entry ⇒ entry.utf8String)
    }
    onSuccess(s) { x ⇒
      complete(HttpResponse(
        entity = HttpEntity.Chunked(
          MediaTypes.`text/plain`,
          x.map(x ⇒ ByteString(x + "----\n", "UTF-8")
          )
        )))
    }
  }



*Full gist*: https://gist.github.com/jgordijn/390c9022062cfb9fce8c

*Exception:*
[ERROR] [12/17/2015 22:06:10.493] [Test-akka.actor.default-dispatcher-4] 
[ActorSystem(Test)] Outgoing request stream error
EntityStreamSizeException(8388608, None)
at 
akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:469)
at 
akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:451)
at 
akka.stream.stage.AbstractStage$PushPullGraphLogic$$anon$1.onPush(Stage.scala:54)
at 
akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:535)
at 
akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:546)
at 
akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:509)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$runBatch(ActorGraphInterpreter.scala:408)
at 
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:346)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:292)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [12/17/2015 22:06:10.491] [Test-akka.actor.default-dispatcher-9] 
[ActorSystem(Test)] Outgoing response stream error
EntityStreamSizeException(8388608, None)
at 
akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:469)
at 
akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:451)
at 
akka.stream.stage.AbstractStage$PushPullGraphLogic$$anon$1.onPush(Stage.scala:54)
at 
akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:535)
at 
akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:546)
at 
akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:509)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$runBatch(ActorGraphInterpreter.scala:408)
at 
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:346)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at 
akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:292)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

[ERROR] [12/17/2015 22:06:10.500] [Test-akka.actor.default-dispatcher-8] 
[ActorSystem(Test)] Outgoing response stream error 
(akka.stream.StreamTcpException)
[INFO] [12/17/2015 22:06:10.502] [Test-akka.actor.default-dispatcher-3] 
[akka://Test/system/IO-TCP/selectors/$a/4] Message 
[akka.io.Tcp$ResumeReading$] from 
Actor[akka://Test/user/StreamSupervisor-0/StageActorRef-4] to 
Actor[akka://Test/system/IO-TCP/selectors/$a/4#-1123967977] was not 
delivered. [1] dead letters encountered. This logging can be turned off or 
adjusted with configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to