I'm trying to implements a way to control a flow (start/stop), nothing was 
implemented yet in the core of akka-stream

My current implementation looks like this.

trait ValveSwitch {
  def open: Unit
  def close: Unit

class Valve[A](mode: ValveMode = ValveMode.Open) extends 
GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {

  override val shape = FlowShape(Inlet[A]("valve.in"), 

  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, ValveSwitch) = {
    val logic = new ValveGraphStageLogic(shape, mode)
    (logic, logic.switch)

  private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) 
extends GraphStageLogic(shape){
    import shape._

    var bufferedElement = List.empty[A]

    val switch = new ValveSwitch {
      override def open: Unit = {
        mode = ValveMode.Open
        println(s"pushing $bufferedElement, out is available ? 
        bufferedElement.foreach(push(out, _))
        bufferedElement = List.empty

      override def close: Unit = {
        mode = ValveMode.Closed

    setHandler(in, new InHandler {
      override def onPush(): Unit = {
        val element = grab(in) //acquires the element that has been 
received during an onPush
        println(s"${mode} on push called with $element")
        if (mode == ValveMode.Open) {
          push(out, element) //push directly the element on the out port
        } else {
          bufferedElement = bufferedElement :+ element

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        println("on pull called")
        pull(in) //request the next element on in port

trait ValveMode

object ValveMode {
  case object Open extends ValveMode
  case object Closed extends ValveMode


My current unit test is failing. due to the fact when i open the valve, i 
never received the previous message.
It seems even if i push the element through ( valve.open ) the sink never 
receive the element

class ValveSpec extends FlatSpec {

  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val executionContext = materializer.executionContext

  "A closed valve" should "emit only 3 elements after it has been open" in {
    val (valve, probe) = Source(1 to 3)
      .viaMat(new Valve(ValveMode.Closed))(Keep.right)



    probe.expectNext(2, 3)


Here the gist 

