*Im trying to construct a GaphStage that will be used when connecting to a TCP 
socket. The following protocol is used when connecting:
1. Materialize TCP flow to create an outgoingConnection
2. Socket return Connect message
3. If Connect message OK send AuthenticationMessage
4. If OK response on AuthenticationMessage send SubscriptionMessage
5. If Subscription OK forward all incomming messages that are not Heatbeat

This is the code for the GraphStage.*



class SubscriptionGraphStage(token: SessionToken)
  extends GraphStage[BidiShape[BetfairRequest, BetfairRequest, BetfairEvent, 
BetfairEvent]]
    with LazyLogging {

  import com.bfg.infrastructure.betfair.temp.TcpProxyState._

  val bytesIn: Inlet[BetfairRequest] = Inlet("OutgoingTCP.in")

  val bytesOut: Outlet[BetfairEvent] = Outlet("OutgoingTCP.out")
  val sslIn: Inlet[BetfairEvent] = Inlet("OutgoingSSL.in")
  val sslOut: Outlet[BetfairRequest] = Outlet("OutgoingSSL.out")

  override def shape: BidiShape[BetfairRequest, BetfairRequest, BetfairEvent, 
BetfairEvent] = BidiShape.apply(bytesIn, sslOut, sslIn, bytesOut)

  val authMessage = RequestAuthentication(session = token.sessionToken, id = 1, 
appKey = "tDwhr80fJKsOW725")
  val marketSubscription: BetfairRequest = RequestMarketSubscription(
    id=123,
    marketFilter = MarketSubscriptionMarketFilter(
      eventTypeIds = List("7"),
      marketTypes = List("WIN"),
      countryCodes = List("GB")
    ),
    marketDataFilter = MarketSubscriptionMarketDataFilter(
      fields = List("EX_ALL_OFFERS","EX_MARKET_DEF"),
      ladderLevels = Some(3)
    )
  )
  def connectionValid(event: BetfairEvent): Boolean = event match {
    case m: ConnectionMessage => true
    case _ => false
  }

  def authenticationValid(event: BetfairEvent): Boolean = event match {
    //todo simplifed need to control id
    case m: StatusMessage if m.statusCode == "SUCCESS" => true
    case _ => false
  }
  def subscriptionValid(event: BetfairEvent): Boolean = event match {
    //todo simplifed need to control id
    case m: StatusMessage if m.statusCode == "SUCCESS" => true
    case _ => false
  }

  def heatbeatValid(event: BetfairEvent): Boolean = event match {
    case m: MarketChangeMessage if m.ct == Some("HEARTBEAT") => true
    case _ => false
  }

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
new GraphStageLogic(shape) {
    private var state: State = Connecting

    // I1
    setHandler(sslIn, new InHandler {
      override def onPush() = {
        state match {
          case Connecting =>
            val event = grab(sslIn)
            logger.info(event.toString)
            if (connectionValid(event)) {
              state = Authenticating
              logger.info(state.toString)
              push(sslOut, authMessage)
            } else {
              failStage(new BetfairConnectionFailedException(s"Connection to 
Betfair failed"))
            }
          case Authenticating =>
            val event = grab(sslIn)
            logger.info(event.toString)
            if (authenticationValid(event)) {
              state = Subscribing
              logger.info(state.toString)
              push(sslOut, marketSubscription)
            } else {
              failStage(new 
BetfairAuthenticationFailedException(s"Authentication with Betfair failed"))
            }
          case Subscribing =>
            val event = grab(sslIn)
            logger.info(event.toString)
            if (subscriptionValid(event)) {
              state = Subscribed
              logger.info(state.toString)
            } else {
              failStage(new BetfairSubscriptionFailedException(s"Subscription 
with Betfair failed"))
            }
          case Subscribed =>
            val event = grab(sslIn)
            logger.info(event.toString)
            if (heatbeatValid(event)) {
              logger.info("Heartbeat")
              pull(sslIn)
            } else {
              push(bytesOut, event)
              logger.info("MarketChange")
            }
        }
      }

      override def onUpstreamFinish(): Unit = complete(bytesOut)
    })

    // I2
    setHandler(bytesIn, new InHandler {
      override def onPush() = {
      }

      override def onUpstreamFinish(): Unit = complete(sslOut)
    })

    // Called when transport pull for data
    // O1
    setHandler(bytesOut, new OutHandler {
      override def onPull() = state match {
        case Connecting =>
        case Authenticating =>
        case Subscribing =>
        case Subscribed => pull(sslIn)
      }

      override def onDownstreamFinish(): Unit = cancel(sslIn)
    })

    // O2
    setHandler(sslOut, new OutHandler {
      override def onPull() = state match {
        case Connecting => pull(sslIn)
        case Authenticating => pull(sslIn)
        case Subscribing => pull(sslIn)
        case _ =>
      }

      override def onDownstreamFinish(): Unit = cancel(bytesIn)
    })

  }
}


This code works as expected in the following test:


class SubscriptionGraphStageTest(_system: ActorSystem)
  extends TestKit(_system)
    with FlatSpecLike with Matchers
    with BeforeAndAfterAll
    with ScalaFutures
{
  implicit val materializer = 
ActorMaterializer(ActorMaterializerSettings(_system).withFuzzing(true))

  def this() = this(ActorSystem())

  "SubscriptionGraphStage" should "handle a successful subscription" in new 
Context {
    source.sendNext(connectionOK)
    val authenticationRequest = sink.requestNext()

    source.sendNext(authenticationOK)
    val subscriptionRequest = sink.requestNext()

    source.sendNext(subscriptionOK)
    sink.request(2)

    source.sendNext(heartbeat)
    source.sendNext(marketChange)
    source.sendNext(heartbeat)
    source.sendNext(marketChange)
    fromBetfairProbe.requestNext(marketChange)
    fromBetfairProbe.requestNext(marketChange)

  }
  it should "handle failures in subscription" in {}

  override def afterAll(): Unit = {
    materializer.shutdown()
    _system.terminate()
  }

  trait Context {
    // After materialization of TLS we get connection message
    val connectionOK = ConnectionMessage(op = "connection", connectionId = 
"050-41605-1391626")
    // After authentication message we get
    val authenticationOK = StatusMessage(op = "status", id = Some(1), 
statusCode = "SUCCESS", connectionClosed = false, errorCode = None, 
errorMessage = None, connectionId = None)
    // After MarketSubscription
    val subscriptionOK = StatusMessage(op = "status", id = Some(123), 
statusCode = "SUCCESS", connectionClosed = false, errorCode = None, 
errorMessage = None, connectionId = None)
    // After connection we get heartbeat
    val heartbeat = MarketChangeMessage(op = "mcm", id = 123, clk = "AAAAAAAA", 
pt = 1495773941868L, ct = Some("HEARTBEAT"), heartbeatMs = None, initialClk = 
None, mc = None, conflateMs = None, segmentType = None)
    // Finally we get MarketChangeMessage
    val marketChange = MarketChangeMessage(op = "mcm", id = 123, clk = 
"AAAAAAAA", pt = 1495773941868L, ct = Some("BOOM"), heartbeatMs = None, 
initialClk = None, mc = None, conflateMs = None, segmentType = None)

    val proxyStage = new 
SubscriptionGraphStage(SessionToken("fasdfasdf","SUCCESS"))
    val proxyFlow: BidiFlow[BetfairRequest, BetfairRequest, BetfairEvent, 
BetfairEvent, NotUsed] = BidiFlow.fromGraph(proxyStage)

    val fromBetfairProbe = TestSubscriber.probe[BetfairEvent]()
    val toBetfairProbe = TestPublisher.probe[BetfairRequest]()

    val transportFlow = Flow.fromSinkAndSource(
      Sink.fromSubscriber(fromBetfairProbe),
      Source.fromPublisher(toBetfairProbe))

    val flowUnderTest = proxyFlow.reversed.join(transportFlow)

    val (source, sink) = TestSource.probe[BetfairEvent]
      .via(flowUnderTest)
      .toMat(TestSink.probe[BetfairRequest])(Keep.both)
      .run()
  }
}


However when I run this on an actual TCP connection I get to the Subscribed 
stage but then newer any Heartbeat or 

MarketChange arrive. This drives me crazy since I have no idea about how to 
debug or trouble shoot this? My guess is that

im missing some pull etc but BidiShape is really messy to understand. Any help 
on resolving this would help! If you need

more info or code just ask but I wanted to start with the minimal.


All this code is heavily inspired by this blog article: BLOG 
<http://blog.scalac.io/2017/04/25/akka-streams-graph-stage.html>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to