Hi,

Im currently trying to implement a valve Graph to manage pause/resume. We 
can control the behavior of the graph by using the MaterializeValue

trait ValveSwitch {
  def open: Unit
  def close: Unit
}


Current implementation of the valve

class Valve[A](mode: ValveMode = ValveMode.Open) extends 
GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {

  override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))

  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, ValveSwitch) = {
    val logic = new ValveGraphStageLogic(shape, mode)
    (logic, logic.switch)
  }

}


The current implementation is pretty simple, each time we are receiving a 
onPull demand we are requesting by doing pull(in).
When a onPush demand is received we are checking the current state 
- if Open we are doing the default behavior by doing push(out,element)
- if Close we are putting the element into a queue

private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends 
GraphStageLogic(shape){
  import shape._

  var bufferedElement = List.empty[A]

  val switch = new ValveSwitch {
    override def open: Unit = {
      mode = ValveMode.Open
      println(s"pushing $bufferedElement, out is available ? 
${isAvailable(out)}")

      bufferedElement.foreach(push(out, _))
      bufferedElement = List.empty
    }

    override def close: Unit = {
      mode = ValveMode.Closed
    }
  }

  setHandler(in, new InHandler {
    override def onPush(): Unit = {
      val element = grab(in) //acquires the element that has been received 
during an onPush
      println(s"${mode} on push called with $element")
      if (mode == ValveMode.Open) {
        push(out, element) //push directly the element on the out port
      } else {
        bufferedElement = bufferedElement :+ element
      }
    }
  })

  setHandler(out, new OutHandler {
    override def onPull(): Unit = {
      println("on pull called")
      pull(in) //request the next element on in port
    }
  })
}


When we are resuming the valve my using the switch.open, we are pushing the 
element

override def open: Unit = {

  mode = ValveMode.Open
  println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")

  bufferedElement.foreach(push(out, _))
  bufferedElement = List.empty
}


The Current test is failing

"A closed valve" should "emit only 3 elements after it has been open" in {
  
    val (valve, probe) = Source(1 to 5)
    .viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by 
default is closed, dont push any message
    .toMat(TestSink.probe[Int])(Keep.both)
    .run()

  probe.request(2)
  probe.expectNoMsg()

  valve.open //open the valve should emit the previous


  probe.expectNext shouldEqual 1 //we never receive the element
  probe.expectNext shouldEqual 2

  probe.request(3)
  probe.expectNext shouldEqual 3
  probe.expectNext shouldEqual 4
  probe.expectNext shouldEqual 5

  probe.expectComplete()
}


Here the console log

on pull called
Closed on push called with 1
pushing Some(1), out is available ? true

Expected OnNext(_), yet no element signaled during 3 seconds
java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3 
seconds
at 
akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
at 
akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
at 
com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:44)
at 
com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:34)


I'm suspecting the current code to have an issue when we are resuming the 
valve, it doesnt seems the push really works

val switch = new ValveSwitch {
    override def open: Unit = {
      mode = ValveMode.Open
      println(s"pushing $bufferedElement, out is available ? 
${isAvailable(out)}")

      bufferedElement.foreach(push(out, _))
      bufferedElement = Option.empty
    }

    override def close: Unit = {
      mode = ValveMode.Closed
    }
  }


There is definitively something i dont catch up, if anyone could help me to 
see some light....

Here the gist 
https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938

Any help would be appreciated

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to