Hi Akka Team, All,

I have few custom stages, recently i find out that one of them under 
certain conditions was not properly completing graph stage, which was 
causing graph to run forever.
I have timeouts attached to graph and would expect it to fail with timeout 
but it never happened. I created a simple reproducer (below).

I have a BrokenStage that is not completing stage in onUpstreamFinish method 
and i have a graph which has idleTimeout, completionTimeout and killSwitch 
attached. None of this has an effect on the graph it fails with future 

I understand that this is developer issue and if i move any of timeouts 
after broken stage it will work as expected.
My questions would be:

1. I would still expect killSwitch or completion timeout to fail the graph, 
why it is not a case?
2. how i can detect such things / safely attach timeouts to graphs? 

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

import scala.concurrent.Await
import scala.concurrent.duration._

case class BrokenStage[A](name: String) extends GraphStage[FlowShape[A, A]] {
  val in = Inlet[A]("in")
  val out = Outlet[A]("out")
  override val shape: FlowShape[A, A] = FlowShape(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
new GraphStageLogic(shape) {
    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        println(s"$name: onPush")
        push(out, grab(in))
      override def onUpstreamFinish(): Unit = {
        println(s"$name: onUpstreamFinish")
      override def onUpstreamFailure(ex: Throwable): Unit = {
        println(s"$name: onUpstreamFailure: ${ex.getMessage}")
    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        println(s"$name: onPull")
      override def onDownstreamFinish(): Unit = {
        println(s"$name: onDownstreamFinish")

object Example {

  implicit val actorSystem = ActorSystem()
  implicit val ec = actorSystem.dispatcher
  implicit val materializer = ActorMaterializer()

  def main(args: Array[String]): Unit = {
    val source = Source.empty[Int]
    val sink = Sink.last[Int]

    val (killSwitch, last) =
        .via(new BrokenStage("after"))

    killSwitch.abort(new RuntimeException("boom!"))

    Await.result(last, 25.second)


