Hi there,
I am trying to design a Bidi Flow between a client and server. The idea is
for the server (upon receiving a TCP inbound connection) start sending data
("Hello" messages) on a interval of every 2 seconds to the connected
client. The client also sends "Ping" messages (every 5 seconds) and the
server should respond with a "Pong" message for every received "Ping"
message.
The code is below and at the moment it is not quite working as expected. I
can see the client receiving "Hello" messages, but no "Pong" messages are
being received on the client side. It seems the server never received a
"Ping" message in first place.
Please, any help on that would be very much appreciate it.
Thanks,
Leo
import java.net.InetSocketAddress
import akka.actor.ActorSystem
import akka.stream.scaladsl.{BidiFlow, Broadcast, Flow, GraphDSL, Merge,
Sink, Source, Tcp}
import akka.stream.{ActorMaterializer, BidiShape}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object BidiFlowMain extends App {
self =>
val config = ConfigFactory.load()
implicit val system = ActorSystem("BidiFlowMain", config)
implicit val executor = system.dispatcher
implicit val materializer = ActorMaterializer()
val server = "127.0.0.1"
val port = 8888
new Server {
override def server: String = self.server
override def port: Int = self.port
override implicit def system: ActorSystem = self.system
override implicit def materializer: ActorMaterializer =
self.materializer
}
new Client {
override def server: String = self.server
override def port: Int = self.port
override implicit def system: ActorSystem = self.system
override implicit def materializer: ActorMaterializer =
self.materializer
}
}
trait Client {
def server: String
def port: Int
implicit def system: ActorSystem
implicit def materializer: ActorMaterializer
val remoteAddress = new InetSocketAddress("127.0.0.1", 8888)
def connection = Tcp().outgoingConnection(remoteAddress)
val clientProcessingFlow = Flow[ByteString].map { bs =>
println(s"Client received: [${bs.utf8String}]")
bs
}
val pingFlow = Flow[ByteString].map { bs =>
if (bs.utf8String.toUpperCase.contains("PONG")) {
println("Received PONG from server")
true
}
else false
}
val delay = Flow[ByteString].map { bs =>
Thread.sleep(1000)
bs
}
val bidi = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
// construct and add the top flow, going outbound
val outbound = b.add(Flow[ByteString]
.filter(_.utf8String.toUpperCase.contains("PING"))
.map{ bs => println(s"Client issuing a ping"); bs}
)
// construct and add the bottom flow, going inbound
val inbound = b.add(clientProcessingFlow)
BidiShape.fromFlows(outbound, inbound)
})
val flow = bidi.join(connection)
Source.tick(0 seconds, 5 seconds, ByteString("Ping"))
.via(flow)
.runWith(Sink.ignore)
}
trait Server {
def server: String
def port: Int
implicit def system: ActorSystem
implicit def materializer: ActorMaterializer
// start data server
Tcp().bind(server, port) runForeach { conn =>
val tcpFlow = conn.flow
val serverProcessingFlow = Flow[ByteString].map { bs =>
println(s"Server received: [${bs.utf8String}]")
bs
}
.filter(bs => bs.utf8String.toUpperCase.contains("PING"))
.map(bs => ByteString("Pong"))
.via(tcpFlow)
val bidi = BidiFlow.fromGraph(GraphDSL.create() { implicit b =>
// construct and add the top flow, going outbound
val outbound = b.add(tcpFlow)
// construct and add the bottom flow, going inbound
val inbound = b.add(serverProcessingFlow)
BidiShape.fromFlows(outbound, inbound)
})
val flow = bidi.join(tcpFlow)
Source.tick(0 seconds, 2 seconds, ByteString("Hello"))
.map{bs => println(s"Sending to client = [${bs.utf8String}]"); bs}
.via(tcpFlow)
.runWith(Sink.ignore)
println(s"Setting handler for: [$conn]")
conn.handleWith(flow)
}
println(s"Server started - listening on $server:$port")
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.