Hi, The explanation is that you use Source.empty which immediately completes the stream. Since you overrode onUpstreamFinish() to ignore this completion, it just gets stuck. Below your broken stage there are no timeouts (timeouts don't apply to a whole stream, only to that exact communication point where they are injected).
So the events are: 1. Source.empty completes stream 2. Completion is stuck at BrokenStage 3. KillSwitch abort arrives in emptyness as stream is no longer running on that upper half, only below BrokenStage 4. Sink.last times out since no element is present and nothing is completed If you replace Source.empty with Source.maybe (which does not immediately close the stream, only once its materialized Promise is completed with a None). Once you done that, the Await.result will correctly throw the RuntimeException("boom"). So everything works as designed :D -Endre On Tue, Oct 18, 2016 at 9:43 PM, Kyrylo Stokoz <k.sto...@gmail.com> wrote: > Hi Akka Team, All, > > I have few custom stages, recently i find out that one of them under > certain conditions was not properly completing graph stage, which was > causing graph to run forever. > I have timeouts attached to graph and would expect it to fail with timeout > but it never happened. I created a simple reproducer (below). > > I have a BrokenStage that is not completing stage in onUpstreamFinish method > and i have a graph which has idleTimeout, completionTimeout and killSwitch > attached. None of this has an effect on the graph it fails with future > timeout. > > I understand that this is developer issue and if i move any of timeouts > after broken stage it will work as expected. > My questions would be: > > 1. I would still expect killSwitch or completion timeout to fail the > graph, why it is not a case? > 2. how i can detect such things / safely attach timeouts to graphs? > > Reproducer: > import akka.actor.ActorSystem > import akka.stream._ > import akka.stream.scaladsl.{Keep, Sink, Source} > import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} > > import scala.concurrent.Await > import scala.concurrent.duration._ > > > case class BrokenStage[A](name: String) extends GraphStage[FlowShape[A, A]] { > val in = Inlet[A]("in") > val out = Outlet[A]("out") > override val shape: FlowShape[A, A] = FlowShape(in, out) > > override def createLogic(inheritedAttributes: Attributes): GraphStageLogic > = new GraphStageLogic(shape) { > setHandler(in, new InHandler { > override def onPush(): Unit = { > println(s"$name: onPush") > push(out, grab(in)) > } > override def onUpstreamFinish(): Unit = { > println(s"$name: onUpstreamFinish") > } > override def onUpstreamFailure(ex: Throwable): Unit = { > println(s"$name: onUpstreamFailure: ${ex.getMessage}") > super.onUpstreamFailure(ex) > } > }) > setHandler(out, new OutHandler { > override def onPull(): Unit = { > println(s"$name: onPull") > pull(in) > } > override def onDownstreamFinish(): Unit = { > println(s"$name: onDownstreamFinish") > super.onDownstreamFinish() > } > }) > } > } > > object Example { > > implicit val actorSystem = ActorSystem() > implicit val ec = actorSystem.dispatcher > implicit val materializer = ActorMaterializer() > > def main(args: Array[String]): Unit = { > val source = Source.empty[Int] > val sink = Sink.last[Int] > > val (killSwitch, last) = > source > .idleTimeout(10.second) > .completionTimeout(20.seconds) > .viaMat(KillSwitches.single)(Keep.right) > .via(new BrokenStage("after")) > .toMat(sink)(Keep.both) > .run() > > Thread.sleep(1000) > killSwitch.abort(new RuntimeException("boom!")) > > Await.result(last, 25.second) > sys.exit(-1) > } > > } > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/ > current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to akka-user+unsubscr...@googlegroups.com. > To post to this group, send email to akka-user@googlegroups.com. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- Akka Team Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM Twitter: @akkateam -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.