*Im trying to construct a GaphStage that will be used when connecting to a TCP
socket. The following protocol is used when connecting:
1. Materialize TCP flow to create an outgoingConnection
2. Socket return Connect message
3. If Connect message OK send AuthenticationMessage
4. If OK response on AuthenticationMessage send SubscriptionMessage
5. If Subscription OK forward all incomming messages that are not Heatbeat
This is the code for the GraphStage.*
class SubscriptionGraphStage(token: SessionToken)
extends GraphStage[BidiShape[BetfairRequest, BetfairRequest, BetfairEvent,
BetfairEvent]]
with LazyLogging {
import com.bfg.infrastructure.betfair.temp.TcpProxyState._
val bytesIn: Inlet[BetfairRequest] = Inlet("OutgoingTCP.in")
val bytesOut: Outlet[BetfairEvent] = Outlet("OutgoingTCP.out")
val sslIn: Inlet[BetfairEvent] = Inlet("OutgoingSSL.in")
val sslOut: Outlet[BetfairRequest] = Outlet("OutgoingSSL.out")
override def shape: BidiShape[BetfairRequest, BetfairRequest, BetfairEvent,
BetfairEvent] = BidiShape.apply(bytesIn, sslOut, sslIn, bytesOut)
val authMessage = RequestAuthentication(session = token.sessionToken, id = 1,
appKey = "tDwhr80fJKsOW725")
val marketSubscription: BetfairRequest = RequestMarketSubscription(
id=123,
marketFilter = MarketSubscriptionMarketFilter(
eventTypeIds = List("7"),
marketTypes = List("WIN"),
countryCodes = List("GB")
),
marketDataFilter = MarketSubscriptionMarketDataFilter(
fields = List("EX_ALL_OFFERS","EX_MARKET_DEF"),
ladderLevels = Some(3)
)
)
def connectionValid(event: BetfairEvent): Boolean = event match {
case m: ConnectionMessage => true
case _ => false
}
def authenticationValid(event: BetfairEvent): Boolean = event match {
//todo simplifed need to control id
case m: StatusMessage if m.statusCode == "SUCCESS" => true
case _ => false
}
def subscriptionValid(event: BetfairEvent): Boolean = event match {
//todo simplifed need to control id
case m: StatusMessage if m.statusCode == "SUCCESS" => true
case _ => false
}
def heatbeatValid(event: BetfairEvent): Boolean = event match {
case m: MarketChangeMessage if m.ct == Some("HEARTBEAT") => true
case _ => false
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private var state: State = Connecting
// I1
setHandler(sslIn, new InHandler {
override def onPush() = {
state match {
case Connecting =>
val event = grab(sslIn)
logger.info(event.toString)
if (connectionValid(event)) {
state = Authenticating
logger.info(state.toString)
push(sslOut, authMessage)
} else {
failStage(new BetfairConnectionFailedException(s"Connection to
Betfair failed"))
}
case Authenticating =>
val event = grab(sslIn)
logger.info(event.toString)
if (authenticationValid(event)) {
state = Subscribing
logger.info(state.toString)
push(sslOut, marketSubscription)
} else {
failStage(new
BetfairAuthenticationFailedException(s"Authentication with Betfair failed"))
}
case Subscribing =>
val event = grab(sslIn)
logger.info(event.toString)
if (subscriptionValid(event)) {
state = Subscribed
logger.info(state.toString)
} else {
failStage(new BetfairSubscriptionFailedException(s"Subscription
with Betfair failed"))
}
case Subscribed =>
val event = grab(sslIn)
logger.info(event.toString)
if (heatbeatValid(event)) {
logger.info("Heartbeat")
pull(sslIn)
} else {
push(bytesOut, event)
logger.info("MarketChange")
}
}
}
override def onUpstreamFinish(): Unit = complete(bytesOut)
})
// I2
setHandler(bytesIn, new InHandler {
override def onPush() = {
}
override def onUpstreamFinish(): Unit = complete(sslOut)
})
// Called when transport pull for data
// O1
setHandler(bytesOut, new OutHandler {
override def onPull() = state match {
case Connecting =>
case Authenticating =>
case Subscribing =>
case Subscribed => pull(sslIn)
}
override def onDownstreamFinish(): Unit = cancel(sslIn)
})
// O2
setHandler(sslOut, new OutHandler {
override def onPull() = state match {
case Connecting => pull(sslIn)
case Authenticating => pull(sslIn)
case Subscribing => pull(sslIn)
case _ =>
}
override def onDownstreamFinish(): Unit = cancel(bytesIn)
})
}
}
This code works as expected in the following test:
class SubscriptionGraphStageTest(_system: ActorSystem)
extends TestKit(_system)
with FlatSpecLike with Matchers
with BeforeAndAfterAll
with ScalaFutures
{
implicit val materializer =
ActorMaterializer(ActorMaterializerSettings(_system).withFuzzing(true))
def this() = this(ActorSystem())
"SubscriptionGraphStage" should "handle a successful subscription" in new
Context {
source.sendNext(connectionOK)
val authenticationRequest = sink.requestNext()
source.sendNext(authenticationOK)
val subscriptionRequest = sink.requestNext()
source.sendNext(subscriptionOK)
sink.request(2)
source.sendNext(heartbeat)
source.sendNext(marketChange)
source.sendNext(heartbeat)
source.sendNext(marketChange)
fromBetfairProbe.requestNext(marketChange)
fromBetfairProbe.requestNext(marketChange)
}
it should "handle failures in subscription" in {}
override def afterAll(): Unit = {
materializer.shutdown()
_system.terminate()
}
trait Context {
// After materialization of TLS we get connection message
val connectionOK = ConnectionMessage(op = "connection", connectionId =
"050-41605-1391626")
// After authentication message we get
val authenticationOK = StatusMessage(op = "status", id = Some(1),
statusCode = "SUCCESS", connectionClosed = false, errorCode = None,
errorMessage = None, connectionId = None)
// After MarketSubscription
val subscriptionOK = StatusMessage(op = "status", id = Some(123),
statusCode = "SUCCESS", connectionClosed = false, errorCode = None,
errorMessage = None, connectionId = None)
// After connection we get heartbeat
val heartbeat = MarketChangeMessage(op = "mcm", id = 123, clk = "AAAAAAAA",
pt = 1495773941868L, ct = Some("HEARTBEAT"), heartbeatMs = None, initialClk =
None, mc = None, conflateMs = None, segmentType = None)
// Finally we get MarketChangeMessage
val marketChange = MarketChangeMessage(op = "mcm", id = 123, clk =
"AAAAAAAA", pt = 1495773941868L, ct = Some("BOOM"), heartbeatMs = None,
initialClk = None, mc = None, conflateMs = None, segmentType = None)
val proxyStage = new
SubscriptionGraphStage(SessionToken("fasdfasdf","SUCCESS"))
val proxyFlow: BidiFlow[BetfairRequest, BetfairRequest, BetfairEvent,
BetfairEvent, NotUsed] = BidiFlow.fromGraph(proxyStage)
val fromBetfairProbe = TestSubscriber.probe[BetfairEvent]()
val toBetfairProbe = TestPublisher.probe[BetfairRequest]()
val transportFlow = Flow.fromSinkAndSource(
Sink.fromSubscriber(fromBetfairProbe),
Source.fromPublisher(toBetfairProbe))
val flowUnderTest = proxyFlow.reversed.join(transportFlow)
val (source, sink) = TestSource.probe[BetfairEvent]
.via(flowUnderTest)
.toMat(TestSink.probe[BetfairRequest])(Keep.both)
.run()
}
}
However when I run this on an actual TCP connection I get to the Subscribed
stage but then newer any Heartbeat or
MarketChange arrive. This drives me crazy since I have no idea about how to
debug or trouble shoot this? My guess is that
im missing some pull etc but BidiShape is really messy to understand. Any help
on resolving this would help! If you need
more info or code just ask but I wanted to start with the minimal.
All this code is heavily inspired by this blog article: BLOG
<http://blog.scalac.io/2017/04/25/akka-streams-graph-stage.html>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.