Hi there,

I am trying to design a Bidi Flow between a client and server. The idea is 
for the server (upon receiving a TCP inbound connection) start sending data 
("Hello" messages) on a interval of every 2 seconds to the connected 
client. The client also sends "Ping" messages (every 5 seconds) and the 
server should respond with a "Pong" message for every received "Ping" 
message. 

The code is below and at the moment it is not quite working as expected. I 
can see the client receiving "Hello" messages, but no "Pong" messages are 
being received on the client side. It seems the server never received a 
"Ping" message in first place. 

Please, any help on that would be very much appreciate it.


Thanks,
Leo




import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.stream.scaladsl.{BidiFlow, Broadcast, Flow, GraphDSL, Merge, 
Sink, Source, Tcp}
import akka.stream.{ActorMaterializer, BidiShape}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._


object BidiFlowMain extends App {

  self =>

  val config = ConfigFactory.load()

  implicit val system = ActorSystem("BidiFlowMain", config)
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val server = "127.0.0.1"
  val port = 8888

  new Server {
    override def server: String = self.server
    override def port: Int = self.port

    override implicit def system: ActorSystem = self.system
    override implicit def materializer: ActorMaterializer = 
self.materializer
  }


  new Client {
    override def server: String = self.server
    override def port: Int = self.port

    override implicit def system: ActorSystem = self.system
    override implicit def materializer: ActorMaterializer = 
self.materializer
  }
}

trait Client {

  def server: String
  def port: Int

  implicit def system: ActorSystem
  implicit def materializer: ActorMaterializer


  val remoteAddress = new InetSocketAddress("127.0.0.1", 8888)
  def connection = Tcp().outgoingConnection(remoteAddress)

  val clientProcessingFlow = Flow[ByteString].map { bs =>
    println(s"Client received: [${bs.utf8String}]")
    bs
  }

  val pingFlow = Flow[ByteString].map { bs =>
    if (bs.utf8String.toUpperCase.contains("PONG")) {
      println("Received PONG from server")
      true
    }
    else false
  }

  val delay = Flow[ByteString].map { bs =>
    Thread.sleep(1000)
    bs
  }


  val bidi = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
    
    // construct and add the top flow, going outbound
    val outbound = b.add(Flow[ByteString]
      .filter(_.utf8String.toUpperCase.contains("PING"))
      .map{ bs => println(s"Client issuing a ping"); bs}
      )

    // construct and add the bottom flow, going inbound
    val inbound = b.add(clientProcessingFlow)

    BidiShape.fromFlows(outbound, inbound)
  })

  val flow = bidi.join(connection)

  Source.tick(0 seconds, 5 seconds, ByteString("Ping"))
    .via(flow)
    .runWith(Sink.ignore)
}


trait Server {

  def server: String
  def port: Int

  implicit def system: ActorSystem
  implicit def materializer: ActorMaterializer

  // start data server
  Tcp().bind(server, port) runForeach { conn =>

    val tcpFlow = conn.flow

    val serverProcessingFlow = Flow[ByteString].map { bs =>
       println(s"Server received: [${bs.utf8String}]")
      bs
    }
    .filter(bs => bs.utf8String.toUpperCase.contains("PING"))
    .map(bs => ByteString("Pong"))
    .via(tcpFlow)
    
   val bidi = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>

      // construct and add the top flow, going outbound
      val outbound = b.add(tcpFlow)

      // construct and add the bottom flow, going inbound
      val inbound = b.add(serverProcessingFlow)

      BidiShape.fromFlows(outbound, inbound)
    })

    val flow = bidi.join(tcpFlow)

    Source.tick(0 seconds, 2 seconds, ByteString("Hello"))
      .map{bs => println(s"Sending to client = [${bs.utf8String}]"); bs}
      .via(tcpFlow)
      .runWith(Sink.ignore)

    println(s"Setting handler for: [$conn]")
    conn.handleWith(flow)
  }

  println(s"Server started - listening on $server:$port")
}

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to