Hey together,

I have a use case where I want to connect two akka-http WebSocket streams 
on a server + maintain backpressure. But the two
streams might be opened with some delay (i.e. the first one is opened some 
minutes before the 2nd and should be "paused").

What should I use as Sink / Source to answer the HTTP Upgrade Request? An 
Actor as bridge between two streams?

*Problem:*

lets assume I have the following setup 

     __________                        ___________                        
__________
    |                  |                      |                    
|                      |                  |
    |                  |                      |                    
|                      |                  |
    |  client A     | < - - WS - - > |  Akka Http    |  < - - WS - - > |  
client B     |
    |                  |                      |     server      
|                       |                  |
    | _________ |                      |___________ |                      
|__________ |


Client A and Client B both connect to the server at roughly the same point 
in time, either one could be the first to actually establish the connection.
What I want to do is forward all data that comes from A to B and the other 
way around.

But I want to make use of backpressure such that neither of the two can get 
overwhelmed. Especially during the connection setup backpressure should be 
applied to
throttle the client that connected first until the 2nd client is connected 
and data can be forwarded at all.

To complete the WebSocket Upgrade request I need a Source and a Sink. If I 
cannot/do not want to keep the Upgrade Request open until the 2nd client 
connects the question arises
what should I use as Source and Sink. My current solution uses an 
intermediate actor to bridge between two streams, kind of a dynamic network.

*From the stream documentation: "Dynamic networks need to be modeled by 
explicitly using the Reactive Streams interfaces for plugging different 
engines together."*

Like this the first client will connect and two ActorBridges will be 
created one serves as Source, the other one as Sink. The Source does 
nothing and the Sink accepts a few items but later on throttles the 
first client by applying backpressure. Once the second client connects it 
will use the same two ActorBridges but in reverse roles. Thus sources and 
sinks are interconnected and data flows.

Is that the correct way to do it? Is there a solution with less custom 
code? Are there any caveats that I miss?



best regards Michael


*Example Code:*

class StreamBridge extends ActorSubscriber with ActorPublisher[Int] {

  val msgQueue = mutable.Queue[Int]()
  val MaxInFlight = 5

  override def receive: Receive = {
    // Subscriber
    case OnNext(in: Int) => {
      msgQueue.enqueue(in)
      deliver()
    }
    case OnError(err: Exception) => {
      onError(err)
      context.stop(self)
    }
    case OnComplete => {
      onComplete()
      context.stop(self)
    }
    // Publisher
    case Request(num) => {
      deliver()
    }
    case Cancel => {
      cancel()
      context.stop(self)
    }
  }

  def deliver(): Unit = {
    while (isActive && totalDemand > 0 && msgQueue.nonEmpty) {
      onNext(msgQueue.dequeue())
    }
  }

  val requestStrategy = new MaxInFlightRequestStrategy(MaxInFlight) {
    def inFlightInternally: Int = {
      msgQueue.size
    }
  }
}


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to