Hi Kyrlyo,

The flow will be cancelled as the downstream is already completed.

I find this little stage may be useful to help you reason about what is
going on, you should be able to verify that it prints "onDownstreamFinish"
when the route times out:

case class SnitchingStage[A](name: String) extends GraphStage[FlowShape[A, A]] {
  val in = Inlet[A]("in")
  val out = Outlet[A]("out")
  override val shape: FlowShape[A, A] = FlowShape(in, out)
  override def createLogic(inheritedAttributes: Attributes):
GraphStageLogic = new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        println(s"$name: onPush")
        push(out, grab(in))
      }
      override def onUpstreamFinish(): Unit = {
        println(s"$name: onUpstreamFinish")
        super.onUpstreamFinish()
      }
      override def onUpstreamFailure(ex: Throwable): Unit = {
        println(s"$name: onUpstreamFailure: ${ex.getMessage}")
        super.onUpstreamFailure(ex)
      }
    })
    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        println(s"$name: onPull")
        pull(in)
      }
      override def onDownstreamFinish(): Unit = {
        println(s"$name: onDownstreamFinish")
        super.onDownstreamFinish()
      }
    })
  }
}


--
Johan
Akka Team

On Sat, Aug 27, 2016 at 10:25 PM, Kyrylo Stokoz <[email protected]> wrote:

> HI All,
>
> I came across a strange issue happening with akka http on request timeout
> which i cannot understand, can some body help me with it?
>
> Consider following code in akka 2.4.9:
>
>
> import akka.NotUsed
> import akka.actor.ActorSystem
> import akka.http.scaladsl.Http
> import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
> import akka.http.scaladsl.server.Directives._
> import akka.stream.{ActorMaterializer, FlowShape}
> import akka.stream.scaladsl.GraphDSL.Implicits._
> import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Zip}
>
> import scala.concurrent.Future
>
> object Test extends App {
>   implicit val actorSystem = ActorSystem()
>   implicit val ec = actorSystem.dispatcher
>   implicit val materializer = ActorMaterializer()
>
>   def routes =
>     (path("test1") & get) {
>       complete("result1")
>     } ~
>     (path("test2") & get) {
>       complete {
>         Future {
>           Thread.sleep(30000)
>           "result2"
>         }
>       }
>     }
>
>   def processRequest(route: Flow[HttpRequest, HttpResponse, NotUsed]): 
> Flow[HttpRequest, HttpResponse, NotUsed] =
>     new ExtendedFlow(route).extend()
>
>   val serverSource = Http().bindAndHandle(processRequest(routes), "0.0.0.0", 
> port = 11011)
> }
>
> final class ExtendedFlow[A, B](originalFlow: Flow[A, B, NotUsed]) {
>
>   def extend(): Flow[A, B, NotUsed] =
>     Flow.fromGraph {
>       GraphDSL.create() { implicit builder =>
>
>         val in = builder.add(Flow[A].map { e => println("in " + e); e })
>
>         val broadcast = builder.add(Broadcast[A](2))
>         val zip       = builder.add(Zip[A, B]())
>
>         val out = builder.add(Flow[(A, B)].map { o => println("out " + o); 
> o._2 })
>
>         in ~> broadcast; broadcast.out(0)                 ~> zip.in0
>                          broadcast.out(1) ~> originalFlow ~> zip.in1; zip.out 
> ~> out
>
>         FlowShape(in.in, out.out)
>       }
>     }
> }
>
>
> Now if i execute `curl -v "http://localhost:11011/test1"` i correctly see 
> 'in' and 'out' print statements in console and "result1" sent to user.
>
>
> My actual confusion is when i execute `curl -v 
> "http://localhost:11011/test2"`.
>
> In this case after 20s (default request timeout in akka http) 
> HttpServerBluePrint sends 503 back to user with a message that server was not 
> able to produce response in time.
>
> Later, in 30s, future completes as well, result of it i guess is ignored as 
> response from client was already handled.
>
> Question here is what actually happening to the extended flow in this 
> situation?
>
>
> I don`t see any output from sent from zip.out, though i see 'in' statement 
> printed for test2. Seeing this i would assume either:
>
> 1. Flow would stuck eventually or
>
> 2. Flow would produce wrong pairs in zip commit from next request elements.
>
>
> From my observations flow keeps working without any problems/exceptions and 
> produce correct pair in zip?
>
>
> Anyone can shed some light what is actually going on here?
>
>
> Regards,
>
> Kyrylo
>
>
>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
> urrent/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM
Twitter: @akkateam

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to