Hi,

The explanation is that you use Source.empty which immediately completes
the stream. Since you overrode onUpstreamFinish() to ignore this
completion, it just gets stuck. Below your broken stage there are no
timeouts (timeouts don't apply to a whole stream, only to that exact
communication point where they are injected).

So the events are:
 1. Source.empty completes stream
 2. Completion is stuck at BrokenStage
 3. KillSwitch abort arrives in emptyness as stream is no longer running on
that upper half, only below BrokenStage
 4. Sink.last times out since no element is present and nothing is completed

If you replace Source.empty with Source.maybe (which does not immediately
close the stream, only once its materialized Promise is completed with a
None).

Once you done that, the Await.result will correctly throw the
RuntimeException("boom").

So everything works as designed :D

-Endre


On Tue, Oct 18, 2016 at 9:43 PM, Kyrylo Stokoz <k.sto...@gmail.com> wrote:

> Hi Akka Team, All,
>
> I have few custom stages, recently i find out that one of them under
> certain conditions was not properly completing graph stage, which was
> causing graph to run forever.
> I have timeouts attached to graph and would expect it to fail with timeout
> but it never happened. I created a simple reproducer (below).
>
> I have a BrokenStage that is not completing stage in onUpstreamFinish method
> and i have a graph which has idleTimeout, completionTimeout and killSwitch
> attached. None of this has an effect on the graph it fails with future
> timeout.
>
> I understand that this is developer issue and if i move any of timeouts
> after broken stage it will work as expected.
> My questions would be:
>
> 1. I would still expect killSwitch or completion timeout to fail the
> graph, why it is not a case?
> 2. how i can detect such things / safely attach timeouts to graphs?
>
> Reproducer:
> import akka.actor.ActorSystem
> import akka.stream._
> import akka.stream.scaladsl.{Keep, Sink, Source}
> import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
>
> import scala.concurrent.Await
> import scala.concurrent.duration._
>
>
> case class BrokenStage[A](name: String) extends GraphStage[FlowShape[A, A]] {
>   val in = Inlet[A]("in")
>   val out = Outlet[A]("out")
>   override val shape: FlowShape[A, A] = FlowShape(in, out)
>
>   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic 
> = new GraphStageLogic(shape) {
>     setHandler(in, new InHandler {
>       override def onPush(): Unit = {
>         println(s"$name: onPush")
>         push(out, grab(in))
>       }
>       override def onUpstreamFinish(): Unit = {
>         println(s"$name: onUpstreamFinish")
>       }
>       override def onUpstreamFailure(ex: Throwable): Unit = {
>         println(s"$name: onUpstreamFailure: ${ex.getMessage}")
>         super.onUpstreamFailure(ex)
>       }
>     })
>     setHandler(out, new OutHandler {
>       override def onPull(): Unit = {
>         println(s"$name: onPull")
>         pull(in)
>       }
>       override def onDownstreamFinish(): Unit = {
>         println(s"$name: onDownstreamFinish")
>         super.onDownstreamFinish()
>       }
>     })
>   }
> }
>
> object Example {
>
>   implicit val actorSystem = ActorSystem()
>   implicit val ec = actorSystem.dispatcher
>   implicit val materializer = ActorMaterializer()
>
>   def main(args: Array[String]): Unit = {
>     val source = Source.empty[Int]
>     val sink = Sink.last[Int]
>
>     val (killSwitch, last) =
>       source
>         .idleTimeout(10.second)
>         .completionTimeout(20.seconds)
>         .viaMat(KillSwitches.single)(Keep.right)
>         .via(new BrokenStage("after"))
>         .toMat(sink)(Keep.both)
>         .run()
>
>     Thread.sleep(1000)
>     killSwitch.abort(new RuntimeException("boom!"))
>
>     Await.result(last, 25.second)
>     sys.exit(-1)
>   }
>
> }
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
> current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at https://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 
Akka Team
Lightbend <http://www.lightbend.com/> - Reactive apps on the JVM
Twitter: @akkateam

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to