Hi,
Im currently trying to implement a valve Graph to manage pause/resume. We
can control the behavior of the graph by using the MaterializeValue
trait ValveSwitch {
def open: Unit
def close: Unit
}
Current implementation of the valve
class Valve[A](mode: ValveMode = ValveMode.Open) extends
GraphStageWithMaterializedValue[FlowShape[A, A], ValveSwitch] {
override val shape = FlowShape(Inlet[A]("valve.in"), Outlet[A]("valve.out"))
override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, ValveSwitch) = {
val logic = new ValveGraphStageLogic(shape, mode)
(logic, logic.switch)
}
}
The current implementation is pretty simple, each time we are receiving a
onPull demand we are requesting by doing pull(in).
When a onPush demand is received we are checking the current state
- if Open we are doing the default behavior by doing push(out,element)
- if Close we are putting the element into a queue
private class ValveGraphStageLogic(shape: Shape, var mode: ValveMode) extends
GraphStageLogic(shape){
import shape._
var bufferedElement = List.empty[A]
val switch = new ValveSwitch {
override def open: Unit = {
mode = ValveMode.Open
println(s"pushing $bufferedElement, out is available ?
${isAvailable(out)}")
bufferedElement.foreach(push(out, _))
bufferedElement = List.empty
}
override def close: Unit = {
mode = ValveMode.Closed
}
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val element = grab(in) //acquires the element that has been received
during an onPush
println(s"${mode} on push called with $element")
if (mode == ValveMode.Open) {
push(out, element) //push directly the element on the out port
} else {
bufferedElement = bufferedElement :+ element
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
println("on pull called")
pull(in) //request the next element on in port
}
})
}
When we are resuming the valve my using the switch.open, we are pushing the
element
override def open: Unit = {
mode = ValveMode.Open
println(s"pushing $bufferedElement, out is available ? ${isAvailable(out)}")
bufferedElement.foreach(push(out, _))
bufferedElement = List.empty
}
The Current test is failing
"A closed valve" should "emit only 3 elements after it has been open" in {
val (valve, probe) = Source(1 to 5)
.viaMat(new Valve(ValveMode.Closed))(Keep.right) //the current valve by
default is closed, dont push any message
.toMat(TestSink.probe[Int])(Keep.both)
.run()
probe.request(2)
probe.expectNoMsg()
valve.open //open the valve should emit the previous
probe.expectNext shouldEqual 1 //we never receive the element
probe.expectNext shouldEqual 2
probe.request(3)
probe.expectNext shouldEqual 3
probe.expectNext shouldEqual 4
probe.expectNext shouldEqual 5
probe.expectComplete()
}
Here the console log
on pull called
Closed on push called with 1
pushing Some(1), out is available ? true
Expected OnNext(_), yet no element signaled during 3 seconds
java.lang.AssertionError: Expected OnNext(_), yet no element signaled during 3
seconds
at
akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:268)
at
akka.stream.testkit.TestSubscriber$ManualProbe.expectNext(StreamTestKit.scala:259)
at
com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:44)
at
com.omsignal.omrun.orchestration.rest.ValveSpec$$anonfun$1.apply(ValveSpec.scala:34)
I'm suspecting the current code to have an issue when we are resuming the
valve, it doesnt seems the push really works
val switch = new ValveSwitch {
override def open: Unit = {
mode = ValveMode.Open
println(s"pushing $bufferedElement, out is available ?
${isAvailable(out)}")
bufferedElement.foreach(push(out, _))
bufferedElement = Option.empty
}
override def close: Unit = {
mode = ValveMode.Closed
}
}
There is definitively something i dont catch up, if anyone could help me to
see some light....
Here the gist
https://gist.github.com/regis-leray/013dfe030159bcd890ca0d5cd440c938
Any help would be appreciated
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.