Hello,
I seem to be having trouble with switching states in my FlexiRoute. 

This code is quite contrived, but the gist is that the router would receive 
different types of messages one type of message can go to one outlet 
(Single), the other must goto all outlets (Broadcast). The router if it is 
the DemandFromAny and it receives a Broadcast would buffer that message and 
switch to a DemandFromAll state, once it got a message of any kind it would 
release it's buffer to all outlets and then determine if it should stay in 
a DemandFromAll or return to a DemandFromAny state. For simplicity sake I 
have ommited the buffering part. Running the code below with debug logging 
enable produced the following output:

fail due to: requirement failed: emit to [out1] not allowed when no demand 
> available
>

Please let me know if I am misunderstanding something or if I have a bug in 
my code. 


Here is a quick test to replicate the behavior:

The flexiRoute impl:

package com.protectwise.streamy.stage

import akka.stream._
import akka.stream.scaladsl.FlexiRoute

/**
 * Created by ben on 7/2/15.
 */

object TheFlexiRoute {
  trait RouteMessage
  case object Single extends RouteMessage
  case object Broadcast extends RouteMessage

}//
class TheFlexiRoute(outs:Int) extends 
FlexiRoute[TheFlexiRoute.RouteMessage,UniformFanOutShape[TheFlexiRoute.RouteMessage,
 TheFlexiRoute.RouteMessage]](
  shape = UniformFanOutShape(
    inlet = new Inlet[TheFlexiRoute.RouteMessage]("flexIn"),
    outlets = {
      (0 until outs).map { i =>
        println(s"Creating out-$i")
        new Outlet[TheFlexiRoute.RouteMessage](s"out${i}")
      }.toSeq
    }:_*
  ),
  attributes = Attributes.name("streamyBalance")
)  {
  import FlexiRoute._
  override def createRouteLogic(p: PortT) = new 
RouteLogic[TheFlexiRoute.RouteMessage] {
    //protected var lastM:Option[Mes] = None

    val broadcastState = {
      State(DemandFromAll(p.outlets)) {
        (ctx, _, element) =>
          element match {
            case TheFlexiRoute.Broadcast =>
              p.outArray.foreach{o =>
                ctx.emit(o)(element)
              }
              SameState
            case TheFlexiRoute.Single =>
              normalState


          }
      }
    }

    val normalState:State[OutPort] = {
      State(DemandFromAny(p.outlets)) {
        (ctx, out, element) =>
          element match {
            case TheFlexiRoute.Broadcast =>
              broadcastState

            case TheFlexiRoute.Single =>
              
ctx.emit(out.asInstanceOf[Outlet[TheFlexiRoute.RouteMessage]])(element)
              SameState


          }
      }
    }

    override def initialState = normalState
  }
}



And then here is my test code (ScalaTest):

package com.protectwise.streamy.stage

import akka.testkit.{TestProbe, TestKit}
import com.protectwise.logging.Logging
import com.typesafe.config.{ConfigFactory, Config}
import org.scalatest.{Matchers, FlatSpecLike}

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.testkit.{TestSubscriber, StreamTestKit, TestPublisher}
import akka.stream._
import akka.stream.testkit.scaladsl.{TestSink, TestSource}
/**
 * Created by ben on 7/2/15.
 */
class TheFlexiRouteTest extends TestKit(ActorSystem("Streamy")) with 
FlatSpecLike with Matchers with Logging {
  val srcId = SourceId("test",0)

  "TheFlexiRoute" should "Route messages correctly based on type" in {
    try {


      implicit val materializer: Materializer = ActorMaterializer()
      val messageProbe = TestProbe()

      val src = TestSource.probe[TheFlexiRoute.RouteMessage]
      //val sink = TestSink.probe[Message[String]]

      val testSink = Sink.foreach[TheFlexiRoute.RouteMessage]{m =>
        println(s"Sink gets: $m")
      }

      def testFlow(i: Int) = Flow.apply[TheFlexiRoute.RouteMessage].map { m =>
        println(s"Messages-$i: ${m}")
        m
      }

      val flow = Flow() { implicit b =>
        import FlowGraph.Implicits._
        val router = b.add(new TheFlexiRoute(2))
        val merge = b.add(Merge[TheFlexiRoute.RouteMessage](2))

        router ~> testFlow(0) ~> merge
        router ~> testFlow(1) ~> merge

        (router.in, merge.out)
      }

      val runableFlow = src.via(flow).to(testSink).run()


      runableFlow.sendNext(TheFlexiRoute.Single)

      runableFlow.sendNext(TheFlexiRoute.Single)
      runableFlow.sendNext(TheFlexiRoute.Broadcast)
      runableFlow.sendNext(TheFlexiRoute.Broadcast)

      runableFlow.sendNext(TheFlexiRoute.Single)
      runableFlow.sendNext(TheFlexiRoute.Single)
      runableFlow.sendNext(TheFlexiRoute.Broadcast)
      runableFlow.sendNext(TheFlexiRoute.Broadcast)

      runableFlow.sendNext(TheFlexiRoute.Single)
      runableFlow.sendNext(TheFlexiRoute.Single)
      runableFlow.sendNext(TheFlexiRoute.Broadcast)
      runableFlow.sendNext(TheFlexiRoute.Broadcast)

      Thread.sleep(15000L)
      println("sending complete")
      runableFlow.sendComplete()
      Thread.sleep(1000L)


    } catch {
      case e:Throwable =>
        logger.error("OHNO", e)
        throw e
    }
  }


}



-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to