I should add that the error is coming from:
p.outArray.foreach{o =>
ctx.emit(o)(element)
}
>From the broadcast state.
On Sunday, July 5, 2015 at 3:13:00 PM UTC-6, Ben Kornmeier wrote:
>
> Hello,
> I seem to be having trouble with switching states in my FlexiRoute.
>
> This code is quite contrived, but the gist is that the router would
> receive different types of messages one type of message can go to one
> outlet (Single), the other must goto all outlets (Broadcast). The router if
> it is the DemandFromAny and it receives a Broadcast would buffer that
> message and switch to a DemandFromAll state, once it got a message of any
> kind it would release it's buffer to all outlets and then determine if it
> should stay in a DemandFromAll or return to a DemandFromAny state. For
> simplicity sake I have ommited the buffering part. Running the code below
> with debug logging enable produced the following output:
>
> fail due to: requirement failed: emit to [out1] not allowed when no demand
> available
>
>
> Please let me know if I am misunderstanding something or if I have a bug
> in my code.
>
>
> Here is a quick test to replicate the behavior:
>
> The flexiRoute impl:
>
> package com.protectwise.streamy.stage
>
> import akka.stream._
> import akka.stream.scaladsl.FlexiRoute
>
> /**
> * Created by ben on 7/2/15.
> */
>
> object TheFlexiRoute {
> trait RouteMessage
> case object Single extends RouteMessage
> case object Broadcast extends RouteMessage
>
> }//
> class TheFlexiRoute(outs:Int) extends
> FlexiRoute[TheFlexiRoute.RouteMessage,UniformFanOutShape[TheFlexiRoute.RouteMessage,
> TheFlexiRoute.RouteMessage]](
> shape = UniformFanOutShape(
> inlet = new Inlet[TheFlexiRoute.RouteMessage]("flexIn"),
> outlets = {
> (0 until outs).map { i =>
> println(s"Creating out-$i")
> new Outlet[TheFlexiRoute.RouteMessage](s"out${i}")
> }.toSeq
> }:_*
> ),
> attributes = Attributes.name("streamyBalance")
> ) {
> import FlexiRoute._
> override def createRouteLogic(p: PortT) = new
> RouteLogic[TheFlexiRoute.RouteMessage] {
> //protected var lastM:Option[Mes] = None
>
> val broadcastState = {
> State(DemandFromAll(p.outlets)) {
> (ctx, _, element) =>
> element match {
> case TheFlexiRoute.Broadcast =>
> p.outArray.foreach{o =>
> ctx.emit(o)(element)
> }
> SameState
> case TheFlexiRoute.Single =>
> normalState
>
>
> }
> }
> }
>
> val normalState:State[OutPort] = {
> State(DemandFromAny(p.outlets)) {
> (ctx, out, element) =>
> element match {
> case TheFlexiRoute.Broadcast =>
> broadcastState
>
> case TheFlexiRoute.Single =>
>
> ctx.emit(out.asInstanceOf[Outlet[TheFlexiRoute.RouteMessage]])(element)
> SameState
>
>
> }
> }
> }
>
> override def initialState = normalState
> }
> }
>
>
>
> And then here is my test code (ScalaTest):
>
> package com.protectwise.streamy.stage
>
> import akka.testkit.{TestProbe, TestKit}
> import com.protectwise.logging.Logging
> import com.protectwise.streamy.message.{Tick, SourceId, Message, Tuple}
> import com.typesafe.config.{ConfigFactory, Config}
> import org.scalatest.{Matchers, FlatSpecLike}
>
> import akka.actor.ActorSystem
> import akka.stream.scaladsl._
> import akka.stream.testkit.{TestSubscriber, StreamTestKit, TestPublisher}
> import akka.stream._
> import akka.
>
> ...
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.