On Wednesday, August 26, 2015 at 3:36:34 AM UTC+2, Simon Schäfer wrote:
>
> There is another thing that just came up: Can I receive data on the client
> with a above approach when there was no prior request?
>
> I made a small code example:
>
I just solved this problem. The new code for server:
def server(system: ActorSystem, address: String, port: Int): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()
val actor = system.actorOf(Props(new Actor{
var _sender: ActorRef = _
override def receive = {
case in: ByteString ⇒
println("server received: " + in)
val s = _sender
Future {
Thread.sleep(2000)
println("sending notification")
s ! ByteString("server test")
}
s ! bs.reverse
case ref: ActorRef ⇒
_sender = ref
}
}))
val in = Flow[ByteString].to(Sink.actorRef(actor, "exit"))
val out = Source.actorRef[ByteString](1,
OverflowStrategy.fail).mapMaterializedValue(actor ! _)
val flow = Flow.wrap(in, out)(Keep.none)
val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
println("Client connected from: " + conn.remoteAddress)
conn handleWith flow
}
val connections = Tcp().bind(address, port)
val binding = connections.to(handler).run()
binding.onComplete {
case Success(b) =>
println("Server started, listening on: " + b.localAddress)
case Failure(e) =>
println(s"Server could not bind to $address:$port: ${e.getMessage}")
}
}
The key was to redirect all the data through an actor, which can send as
many messages as it wants.
>
> import scala.concurrent.Future
> import scala.util._
>
> import akka.actor._
> import akka.stream._, scaladsl._
> import akka.util._
>
> object Test extends App {
>
> def server(system: ActorSystem, address: String, port: Int): Unit = {
> implicit val sys = system
> import system.dispatcher
> implicit val materializer = ActorMaterializer()
>
> val flow = Flow[ByteString].map{bs ⇒
> println("server received: " + bs)
> bs
> }
>
> val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
> println("Client connected from: " + conn.remoteAddress)
> Future {
> Thread.sleep(2000)
> val a = Source.actorRef(1,
> OverflowStrategy.fail).via(conn.flow).to(Sink.foreach{x ⇒ println("server
> sink: " + x)}).run()
> println("sending notification")
> a ! ByteString("server test")
> }
> conn handleWith flow
> }
>
> val connections = Tcp().bind(address, port)
> val binding = connections.to(handler).run()
>
> binding.onComplete {
> case Success(b) =>
> println("Server started, listening on: " + b.localAddress)
> case Failure(e) =>
> println(s"Server could not bind to $address:$port:
> ${e.getMessage}")
> }
> }
>
> def client(system: ActorSystem, address: String, port: Int): Unit = {
> implicit val sys = system
> import system.dispatcher
> implicit val materializer = ActorMaterializer()
>
> val tcpFlow = Flow[ByteString].via(Tcp().outgoingConnection(address,
> port))
> val a = Source.actorRef(1,
> OverflowStrategy.fail).via(tcpFlow).to(Sink.foreach{x ⇒ println("client
> sink: " + x)}).run()
> a ! ByteString("client test")
> }
>
> val ser = ActorSystem("Server")
> server(ser, "127.0.0.1", 6666)
>
> val client = ActorSystem("Client")
> client(client, "127.0.0.1", 6666)
> }
>
> I would expect that the when the future is completed, the client receives
> a notification, but it does not happen. Instead I get a message in dead
> letters. Not sure why. Am I allowed to send to `conn.flow`, or do I have to
> do it differently?
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.