Hey together,
I have a use case where I want to connect two akka-http WebSocket streams
on a server + maintain backpressure. But the two
streams might be opened with some delay (i.e. the first one is opened some
minutes before the 2nd and should be "paused").
What should I use as Sink / Source to answer the HTTP Upgrade Request? An
Actor as bridge between two streams?
*Problem:*
lets assume I have the following setup
__________ ___________
__________
| | |
| | |
| | |
| | |
| client A | < - - WS - - > | Akka Http | < - - WS - - > |
client B |
| | | server
| | |
| _________ | |___________ |
|__________ |
Client A and Client B both connect to the server at roughly the same point
in time, either one could be the first to actually establish the connection.
What I want to do is forward all data that comes from A to B and the other
way around.
But I want to make use of backpressure such that neither of the two can get
overwhelmed. Especially during the connection setup backpressure should be
applied to
throttle the client that connected first until the 2nd client is connected
and data can be forwarded at all.
To complete the WebSocket Upgrade request I need a Source and a Sink. If I
cannot/do not want to keep the Upgrade Request open until the 2nd client
connects the question arises
what should I use as Source and Sink. My current solution uses an
intermediate actor to bridge between two streams, kind of a dynamic network.
*From the stream documentation: "Dynamic networks need to be modeled by
explicitly using the Reactive Streams interfaces for plugging different
engines together."*
Like this the first client will connect and two ActorBridges will be
created one serves as Source, the other one as Sink. The Source does
nothing and the Sink accepts a few items but later on throttles the
first client by applying backpressure. Once the second client connects it
will use the same two ActorBridges but in reverse roles. Thus sources and
sinks are interconnected and data flows.
Is that the correct way to do it? Is there a solution with less custom
code? Are there any caveats that I miss?
best regards Michael
*Example Code:*
class StreamBridge extends ActorSubscriber with ActorPublisher[Int] {
val msgQueue = mutable.Queue[Int]()
val MaxInFlight = 5
override def receive: Receive = {
// Subscriber
case OnNext(in: Int) => {
msgQueue.enqueue(in)
deliver()
}
case OnError(err: Exception) => {
onError(err)
context.stop(self)
}
case OnComplete => {
onComplete()
context.stop(self)
}
// Publisher
case Request(num) => {
deliver()
}
case Cancel => {
cancel()
context.stop(self)
}
}
def deliver(): Unit = {
while (isActive && totalDemand > 0 && msgQueue.nonEmpty) {
onNext(msgQueue.dequeue())
}
}
val requestStrategy = new MaxInFlightRequestStrategy(MaxInFlight) {
def inFlightInternally: Int = {
msgQueue.size
}
}
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.