Hello,
I seem to be having trouble with switching states in my FlexiRoute.
This code is quite contrived, but the gist is that the router would receive
different types of messages one type of message can go to one outlet
(Single), the other must goto all outlets (Broadcast). The router if it is
the DemandFromAny and it receives a Broadcast would buffer that message and
switch to a DemandFromAll state, once it got a message of any kind it would
release it's buffer to all outlets and then determine if it should stay in
a DemandFromAll or return to a DemandFromAny state. For simplicity sake I
have ommited the buffering part. Running the code below with debug logging
enable produced the following output:
fail due to: requirement failed: emit to [out1] not allowed when no demand
> available
>
Please let me know if I am misunderstanding something or if I have a bug in
my code.
Here is a quick test to replicate the behavior:
The flexiRoute impl:
package com.protectwise.streamy.stage
import akka.stream._
import akka.stream.scaladsl.FlexiRoute
/**
* Created by ben on 7/2/15.
*/
object TheFlexiRoute {
trait RouteMessage
case object Single extends RouteMessage
case object Broadcast extends RouteMessage
}//
class TheFlexiRoute(outs:Int) extends
FlexiRoute[TheFlexiRoute.RouteMessage,UniformFanOutShape[TheFlexiRoute.RouteMessage,
TheFlexiRoute.RouteMessage]](
shape = UniformFanOutShape(
inlet = new Inlet[TheFlexiRoute.RouteMessage]("flexIn"),
outlets = {
(0 until outs).map { i =>
println(s"Creating out-$i")
new Outlet[TheFlexiRoute.RouteMessage](s"out${i}")
}.toSeq
}:_*
),
attributes = Attributes.name("streamyBalance")
) {
import FlexiRoute._
override def createRouteLogic(p: PortT) = new
RouteLogic[TheFlexiRoute.RouteMessage] {
//protected var lastM:Option[Mes] = None
val broadcastState = {
State(DemandFromAll(p.outlets)) {
(ctx, _, element) =>
element match {
case TheFlexiRoute.Broadcast =>
p.outArray.foreach{o =>
ctx.emit(o)(element)
}
SameState
case TheFlexiRoute.Single =>
normalState
}
}
}
val normalState:State[OutPort] = {
State(DemandFromAny(p.outlets)) {
(ctx, out, element) =>
element match {
case TheFlexiRoute.Broadcast =>
broadcastState
case TheFlexiRoute.Single =>
ctx.emit(out.asInstanceOf[Outlet[TheFlexiRoute.RouteMessage]])(element)
SameState
}
}
}
override def initialState = normalState
}
}
And then here is my test code (ScalaTest):
package com.protectwise.streamy.stage
import akka.testkit.{TestProbe, TestKit}
import com.protectwise.logging.Logging
import com.protectwise.streamy.message.{Tick, SourceId, Message, Tuple}
import com.typesafe.config.{ConfigFactory, Config}
import org.scalatest.{Matchers, FlatSpecLike}
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.testkit.{TestSubscriber, StreamTestKit, TestPublisher}
import akka.stream._
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
/**
* Created by ben on 7/2/15.
*/
class TheFlexiRouteTest extends TestKit(ActorSystem("Streamy")) with
FlatSpecLike with Matchers with Logging {
val srcId = SourceId("test",0)
"TheFlexiRoute" should "Route messages correctly based on type" in {
try {
implicit val materializer: Materializer = ActorMaterializer()
val messageProbe = TestProbe()
val src = TestSource.probe[TheFlexiRoute.RouteMessage]
//val sink = TestSink.probe[Message[String]]
val testSink = Sink.foreach[TheFlexiRoute.RouteMessage]{m =>
println(s"Sink gets: $m")
}
def testFlow(i: Int) = Flow.apply[TheFlexiRoute.RouteMessage].map { m =>
println(s"Messages-$i: ${m}")
m
}
val flow = Flow() { implicit b =>
import FlowGraph.Implicits._
val router = b.add(new TheFlexiRoute(2))
val merge = b.add(Merge[TheFlexiRoute.RouteMessage](2))
router ~> testFlow(0) ~> merge
router ~> testFlow(1) ~> merge
(router.in, merge.out)
}
val runableFlow = src.via(flow).to(testSink).run()
runableFlow.sendNext(TheFlexiRoute.Single)
runableFlow.sendNext(TheFlexiRoute.Single)
runableFlow.sendNext(TheFlexiRoute.Broadcast)
runableFlow.sendNext(TheFlexiRoute.Broadcast)
runableFlow.sendNext(TheFlexiRoute.Single)
runableFlow.sendNext(TheFlexiRoute.Single)
runableFlow.sendNext(TheFlexiRoute.Broadcast)
runableFlow.sendNext(TheFlexiRoute.Broadcast)
runableFlow.sendNext(TheFlexiRoute.Single)
runableFlow.sendNext(TheFlexiRoute.Single)
runableFlow.sendNext(TheFlexiRoute.Broadcast)
runableFlow.sendNext(TheFlexiRoute.Broadcast)
Thread.sleep(15000L)
println("sending complete")
runableFlow.sendComplete()
Thread.sleep(1000L)
} catch {
case e:Throwable =>
logger.error("OHNO", e)
throw e
}
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.