Hi Michael,

here's an idea how it could work without actors:

 * when the first client connects, create two yet Publisher/Subscriber 
pairs and wrap them into a Flow[Message, Message] while crossing the links, 
pass one of the flows as the websocket handler
 * when the second client connects pass the other flow as the websocket 
handler

See this comment about how to do it: 
https://github.com/akka/akka/issues/17610#issuecomment-113127961

and

https://github.com/akka/akka/issues/17769

In fact, it seemed easy and interesting enough to try, so I gave it a quick 
shot and it works indeed:

https://github.com/spray/akka/commit/720b06dd78b65f96ddf316f63108dd959b896ac1

(I made the screenshot with websocketd's dev console 
(https://github.com/joewalnes/websocketd/wiki/Developer-console))

Johannes

On Friday, August 14, 2015 at 1:31:03 PM UTC+2, Michael Zinsmaier wrote:
>
> Hey together,
>
> I have a use case where I want to connect two akka-http WebSocket streams 
> on a server + maintain backpressure. But the two
> streams might be opened with some delay (i.e. the first one is opened some 
> minutes before the 2nd and should be "paused").
>
> What should I use as Sink / Source to answer the HTTP Upgrade Request? An 
> Actor as bridge between two streams?
>
> *Problem:*
>
> lets assume I have the following setup 
>
>      __________                        ___________                        
> __________
>     |                  |                      |                    
> |                      |                  |
>     |                  |                      |                    
> |                      |                  |
>     |  client A     | < - - WS - - > |  Akka Http    |  < - - WS - - > |  
> client B     |
>     |                  |                      |     server      
> |                       |                  |
>     | _________ |                      |___________ |                      
> |__________ |
>
>
> Client A and Client B both connect to the server at roughly the same point 
> in time, either one could be the first to actually establish the connection.
> What I want to do is forward all data that comes from A to B and the other 
> way around.
>
> But I want to make use of backpressure such that neither of the two can 
> get overwhelmed. Especially during the connection setup backpressure should 
> be applied to
> throttle the client that connected first until the 2nd client is connected 
> and data can be forwarded at all.
>
> To complete the WebSocket Upgrade request I need a Source and a Sink. If I 
> cannot/do not want to keep the Upgrade Request open until the 2nd client 
> connects the question arises
> what should I use as Source and Sink. My current solution uses an 
> intermediate actor to bridge between two streams, kind of a dynamic network.
>
> *From the stream documentation: "Dynamic networks need to be modeled by 
> explicitly using the Reactive Streams interfaces for plugging different 
> engines together."*
>
> Like this the first client will connect and two ActorBridges will be 
> created one serves as Source, the other one as Sink. The Source does 
> nothing and the Sink accepts a few items but later on throttles the 
> first client by applying backpressure. Once the second client connects it 
> will use the same two ActorBridges but in reverse roles. Thus sources and 
> sinks are interconnected and data flows.
>
> Is that the correct way to do it? Is there a solution with less custom 
> code? Are there any caveats that I miss?
>
>
>
> best regards Michael
>
>
> *Example Code:*
>
> class StreamBridge extends ActorSubscriber with ActorPublisher[Int] {
>
>   val msgQueue = mutable.Queue[Int]()
>   val MaxInFlight = 5
>
>   override def receive: Receive = {
>     // Subscriber
>     case OnNext(in: Int) => {
>       msgQueue.enqueue(in)
>       deliver()
>     }
>     case OnError(err: Exception) => {
>       onError(err)
>       context.stop(self)
>     }
>     case OnComplete => {
>       onComplete()
>       context.stop(self)
>     }
>     // Publisher
>     case Request(num) => {
>       deliver()
>     }
>     case Cancel => {
>       cancel()
>       context.stop(self)
>     }
>   }
>
>   def deliver(): Unit = {
>     while (isActive && totalDemand > 0 && msgQueue.nonEmpty) {
>       onNext(msgQueue.dequeue())
>     }
>   }
>
>   val requestStrategy = new MaxInFlightRequestStrategy(MaxInFlight) {
>     def inFlightInternally: Int = {
>       msgQueue.size
>     }
>   }
> }
>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to