I am trying to achieve two way real time communication between akka-http 
websocket server and client

server code : 

object EchoService {


  def route: Route = path("ws-echo") {
    get {
      handleWebSocketMessages(getFlow())
    }
  } ~ path("send-client") {
    get {
      sourceQueue.map(q => {
        println(s"Offering message from server")
        q.offer(BinaryMessage(ByteString("ta ta")))
      })
      complete("Sent from server successfully")
    }
  }


  private var _sQueue: Future[SourceQueue[Message]] = _


  def sourceQueue: Future[SourceQueue[Message]] = _sQueue




  val sink: Sink[Message, _] =
    Sink.
      foreach {
      case message: TextMessage.Strict =>
        println(message.text)


      case _ => println(s"received unknown message format")
    }


  def getFlow() = {
    println(s"calling get flow")
    val source = {
      val p = Promise[SourceQueue[Message]]
      val s = Source.queue[Message](Int.MaxValue, OverflowStrategy.
backpressure).mapMaterializedValue(m => {
        p.trySuccess(m)
        m
      })
      _sQueue = p.future
      s
    }
    Flow.fromSinkAndSourceMat(sink, source)(Keep.right)
  }


} 

Client Code : 

object Client extends App {




  implicit val actorSystem = ActorSystem("akka-system")
  implicit val flowMaterializer = ActorMaterializer()


  val config = actorSystem.settings.config
  val interface = config.getString("app.interface")


  val port = config.getInt("app.port")




  // print each incoming strict text message
  val printSink: Sink[Message, Future[Done]] =
    Sink.foreach {
      case message: TextMessage.Strict =>
        println(message.text)


      case _ => {
        sourceQueue.map(q => {
          println(s"offering message on client")
          q.offer(TextMessage("received unknown"))
        })
        println(s"received unknown message format")
      }
    }


  val (source, sourceQueue) = {
    val p = Promise[SourceQueue[Message]]
    val s = Source.queue[Message](Int.MaxValue, OverflowStrategy.
backpressure).mapMaterializedValue(m => {
      p.trySuccess(m)
      m
    })
      .keepAlive(FiniteDuration(1, TimeUnit.SECONDS), () => TextMessage.
Strict("Heart Beat"))
    (s, p.future)
  }


  val flow =
    Flow.fromSinkAndSourceMat(printSink, source)(Keep.both)


  def connect() : Unit = {
    val (upgradeResponse, (sinkClose,sourceClose)) =
      Http().singleWebSocketRequest(WebSocketRequest(
"ws://localhost:8080/ws-echo"), flow)


    val connected = upgradeResponse.map { upgrade =>
      // just like a regular http request we can get 404 NotFound,
      // with a response body, that will be available from upgrade.response
      if (upgrade.response.status == StatusCodes.SwitchingProtocols || 
upgrade.response.status == StatusCodes.SwitchingProtocols) {
        Done
      } else {
        throw new RuntimeException(s"Connection failed: 
${upgrade.response.status}")
      }
    }




    connected.onComplete(println)


    sinkClose.onComplete {


      case Success(_) => {
        connect()
        println(s"Connection closed gracefully")
      }


      case util.Failure(e) => {
        connect()
        println(s"Connection closed with an error $e")
      }
    }
  }


  connect()


}


1) client and server connected sccessfully and exchanging messages ..

2) client went down , now i wan to buffer sourceQueue on server and reuse 
when client back up ( currently when it backs up i am creating 
source/sourceQueue which works only for new messages)

how can i achieve this ? 


-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to