more specifically: https://github.com/kornstar11/akka/commit/12631dd982eeeff54a9f141126b63ef58e53a533#diff-f57483f18cb0c1f89d98c8a7ec93ed4fR108
Please let me know next steps, should I make a issue for this? On Sunday, July 5, 2015 at 7:40:29 PM UTC-6, Ben Kornmeier wrote: > > Here is a proposed fix: > > > https://github.com/kornstar11/akka/commit/12631dd982eeeff54a9f141126b63ef58e53a533 > > essentially the precondition() call in the flexiRouteImpl was not be > called but once. Changing the method call to a class that wraps var > behavior seems to have fixed it. > > On Sunday, July 5, 2015 at 5:48:35 PM UTC-6, Ben Kornmeier wrote: >> >> I should add that the error is coming from: >> >> p.outArray.foreach{o => >> ctx.emit(o)(element) >> } >> >> >> From the broadcast state. >> >> On Sunday, July 5, 2015 at 3:13:00 PM UTC-6, Ben Kornmeier wrote: >>> >>> Hello, >>> I seem to be having trouble with switching states in my FlexiRoute. >>> >>> This code is quite contrived, but the gist is that the router would >>> receive different types of messages one type of message can go to one >>> outlet (Single), the other must goto all outlets (Broadcast). The router if >>> it is the DemandFromAny and it receives a Broadcast would buffer that >>> message and switch to a DemandFromAll state, once it got a message of any >>> kind it would release it's buffer to all outlets and then determine if it >>> should stay in a DemandFromAll or return to a DemandFromAny state. For >>> simplicity sake I have ommited the buffering part. Running the code below >>> with debug logging enable produced the following output: >>> >>> fail due to: requirement failed: emit to [out1] not allowed when no >>> demand available >>> >>> >>> Please let me know if I am misunderstanding something or if I have a bug >>> in my code. >>> >>> >>> Here is a quick test to replicate the behavior: >>> >>> The flexiRoute impl: >>> >>> package com.protectwise.streamy.stage >>> >>> import akka.stream._ >>> import akka.stream.scaladsl.FlexiRoute >>> >>> /** >>> * Created by ben on 7/2/15. >>> */ >>> >>> object TheFlexiRoute { >>> trait RouteMessage >>> case object Single extends RouteMessage >>> case object Broadcast extends RouteMessage >>> >>> }// >>> class TheFlexiRoute(outs:Int) extends >>> FlexiRoute[TheFlexiRoute.RouteMessage,UniformFanOutShape[TheFlexiRoute.RouteMessage, >>> TheFlexiRoute.RouteMessage]]( >>> shape = UniformFanOutShape( >>> inlet = new Inlet[TheFlexiRoute.RouteMessage]("flexIn"), >>> outlets = { >>> (0 until outs).map { i => >>> println(s"Creating out-$i") >>> new Outlet[TheFlexiRoute.RouteMessage](s"out${i}") >>> }.toSeq >>> }:_* >>> ), >>> attributes = Attributes.name("streamyBalance") >>> ) { >>> import FlexiRoute._ >>> override def createRouteLogic(p: PortT) = new >>> RouteLogic[TheFlexiRoute.RouteMessage] { >>> //protected var lastM:Option[Mes] = None >>> >>> val broadcastState = { >>> State(DemandFromAll(p.outlets)) { >>> (ctx, _, element) => >>> element match { >>> case TheFlexiRoute.Broadcast => >>> p.outArray.foreach{o => >>> ctx.emit(o)(element) >>> } >>> SameState >>> case TheFlexiRoute.Single => >>> normalState >>> >>> >>> } >>> } >>> } >>> >>> val normalState:State[OutPort] = { >>> State(DemandFromAny(p.outlets)) { >>> (ctx, out, element) => >>> element match { >>> case TheFlexiRoute.Broadcast => >>> broadcastState >>> >>> case TheFlexiRoute.Single => >>> >>> ctx.emit(out.asInstanceOf[Outlet[TheFlexiRoute.RouteMessage]])(element) >>> SameState >>> >>> >>> } >>> } >>> } >>> >>> override def initialState = normalState >>> } >>> } >>> >>> >>> >>> And then here is my test code (ScalaTest): >>> >>> package com.protectwise.streamy.stage >>> >>> import akka.testkit.{TestProbe, TestKit} >>> import com.protectwise.logging.Logging >>> import com.protectwise.streamy.message.{Tick, SourceId, Message, Tuple} >>> import com.typesafe.config.{ConfigFactory, Config} >>> import org.scalatest.{Matchers, FlatSpecLike} >>> >>> import akka.actor.ActorSystem >>> import akka.stream.scaladsl._ >>> import akka.stream.testkit.{TestSubscriber, StreamTestKit, TestPublisher} >>> import akka.stream._ >>> import akka. >>> >>> ... >> >> -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
