Hi Matthew, you are right that the current implementation is not optimal: for TCP we always just optimistically try to write unless we know that that won’t work, meaning that you can benefit from NACK-based flow-control instead of waiting for every ACK. For UDP this has not yet been implemented, I have created a ticket. If want to give it a try, there is a very reasonable chance that you’ll beat us to it ;-)
Regards, Roland 4 jun 2014 kl. 21:19 skrev Matthew Pocock <[email protected]>: > Hi again, > > I'm not really sure how to fix this. I've had a hunt through the akka sources > and found that CommandFailed(Send(_), _) is returned if the underlying > UdpConnection instance is in the writePending state. The UdpConnection seems > to be implemented so as to process requests strictly in series, with no > buffering of messages yet to be sent. > > Is the right way to handle this to have another actor that is managing a > messages-to-be-sent queue, which listens to send acknowledgements before > sending the next request? Will I get increased throughput by spawning > multiple UdpConnected instances to the same target address? > > Thanks, > Matthew > > > On 2 June 2014 23:43, Matthew Pocock <[email protected]> wrote: > Hi, > > I'm writing a very simple UDP server that echos messages back to a sender. > The sender tries to spam the server. When I run the client, I get a lot of > `Send(ByteString(......), NoAk(null))` responses. The UDP server literally > just echos the content back. It isn't doing any blocking compute. > > Any ideas what I can do to get rid of these send failures? Code below. > > Matthew > > import java.net.InetSocketAddress > > import akka.actor.{Props, ActorSystem, ActorRef, Actor} > import akka.io.{UdpConnected, IO, Udp} > import akka.util.ByteString > > import scala.collection.immutable.Queue > > object Client { > def main(args: Array[String]): Unit = { > println("Client starting up") > > val Array(host, port) = args > > val system = ActorSystem("boo") > val ss = system.actorOf(Props(new SendStuffConnected(new > InetSocketAddress(host, port.toInt)))) > > println("Client started") > > this synchronized { > this wait 1000 > } > > println("Sending messages") > > for(i <- 1 to 10000) { > ss ! f"$i: Hi Mum" > ss ! f"$i: Yo Dad" > } > > println("Client sent message") > } > } > > case class SendStuffConnected(address: InetSocketAddress) extends Actor { > import context.system > IO(UdpConnected) ! UdpConnected.Connect(self, address) > > def receive: Receive = awaitReady() > > def awaitReady(msgs: Queue[String] = Queue.empty): Receive = { > case UdpConnected.Connected => > println(f"Client awaiting ready with messages: $msgs") > context become ready(sender()) > msgs foreach (self ! _) > case msg: String => > println(f"Client stashing message: $msg") > context become awaitReady(msgs enqueue msg) > } > > def ready(connection: ActorRef): Receive = { > case msg: String => > println(f"Client sending message: $msg") > connection ! UdpConnected.Send(ByteString(msg)) > case UdpConnected.Received(data) => > println(f"Client received message ${data.utf8String}") > case d @ UdpConnected.Disconnect => > connection ! d > case UdpConnected.Disconnected => > context stop self > case UdpConnected.CommandFailed(cmd) => > cmd match { > case UdpConnected.Send(msg, ack) => > println(f"Failed sending : ${msg.utf8String}") > case _ => > println(f"Failed with: $cmd") > } > println(f"Failed command: $cmd") > case unexpected => > println(f"Client got unexpected message: $unexpected") > } > > } > > import java.net.InetSocketAddress > > import akka.actor.{Props, ActorRef, ActorSystem, Actor} > import akka.io.{Udp, IO} > > object Server { > def main(args: Array[String]): Unit = { > println("Server starting up") > > val Array(host, port) = args > > val system = ActorSystem("boo") > val rs = system.actorOf(Props(new ReceiveStuff(new > InetSocketAddress(host, port.toInt)))) > > println("Server started") > } > } > > class ReceiveStuff(address: InetSocketAddress) extends Actor { > import context.system > IO(Udp) ! Udp.Bind(self, address) > > override def receive: Receive = { > case Udp.Bound(local) => > println(f"Server bound to $local") > context become ready(sender()) > case unhandled => > println(f"Server received unhandled message: $unhandled") > } > > def ready(socket: ActorRef): Receive = { > case Udp.Received(data, remote) => > // println(f"Server received message: ${data.utf8String} from ${remote}") > socket ! Udp.Send(data, remote) // echo > case Udp.Unbind => > socket ! Udp.Unbind > case Udp.Unbound => > context stop self > case unhandled => > println(f"Server received unhandled message: $unhandled") > } > } > -- > Dr Matthew Pocock > Turing ate my hamster LTD > mailto: [email protected] > > Integrative Bioinformatics Group, School of Computing Science, Newcastle > University > mailto: [email protected] > > gchat: [email protected] > msn: [email protected] > irc.freenode.net: drdozer > skype: matthew.pocock > tel: (0191) 2566550 > mob: +447535664143 > > > > -- > Dr Matthew Pocock > Turing ate my hamster LTD > mailto: [email protected] > > Integrative Bioinformatics Group, School of Computing Science, Newcastle > University > mailto: [email protected] > > gchat: [email protected] > msn: [email protected] > irc.freenode.net: drdozer > skype: matthew.pocock > tel: (0191) 2566550 > mob: +447535664143 > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at http://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. Dr. Roland Kuhn Akka Tech Lead Typesafe – Reactive apps on the JVM. twitter: @rolandkuhn -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
