Hi again, I'm not really sure how to fix this. I've had a hunt through the akka sources and found that CommandFailed(Send(_), _) is returned if the underlying UdpConnection instance is in the writePending state. The UdpConnection seems to be implemented so as to process requests strictly in series, with no buffering of messages yet to be sent.
Is the right way to handle this to have another actor that is managing a messages-to-be-sent queue, which listens to send acknowledgements before sending the next request? Will I get increased throughput by spawning multiple UdpConnected instances to the same target address? Thanks, Matthew On 2 June 2014 23:43, Matthew Pocock <[email protected]> wrote: > Hi, > > I'm writing a very simple UDP server that echos messages back to a sender. > The sender tries to spam the server. When I run the client, I get a lot of > `Send(ByteString(......), NoAk(null))` responses. The UDP server literally > just echos the content back. It isn't doing any blocking compute. > > Any ideas what I can do to get rid of these send failures? Code below. > > Matthew > > *import *java.net.InetSocketAddress*import *akka.actor.{Props, ActorSystem, > ActorRef, Actor}*import *akka.io.{UdpConnected, IO, Udp}*import > *akka.util.ByteString*import *scala.collection.immutable.Queue*object *Client > { *def *main(args: Array[*String*]): Unit = { *println*(*"Client starting > up"*) *val **Array*(host, port) = args *val *system = > *ActorSystem*(*"boo"*) *val *ss = system.actorOf(*Props*(*new > *SendStuffConnected(*new *InetSocketAddress(host, port.toInt)))) > *println*(*"Client started"*) *this *synchronized { *this *wait 1000 > } *println*(*"Sending messages"*) *for*(i <- 1 to 10000) { ss ! > *f"**$*i*: Hi Mum" > * ss ! *f"**$*i*: Yo Dad" > * } *println*(*"Client sent message"*) }}*case class > *SendStuffConnected(address: InetSocketAddress) *extends *Actor { *import > **context*.system *IO*(UdpConnected) ! UdpConnected.*Connect*(*self*, > address) *def *receive: *Receive *= awaitReady() *def *awaitReady(msgs: > Queue[*String*] = Queue.*empty*): *Receive *= { *case > *UdpConnected.Connected => *println*(*f"Client awaiting ready with > messages: **$*msgs*"*) *context *become ready(sender()) msgs > foreach (*self *! _) *case *msg: *String *=> *println*(*f"Client > stashing message: **$*msg*"*) *context *become awaitReady(msgs enqueue > msg) } *def *ready(connection: ActorRef): *Receive *= { *case *msg: > *String *=> *println*(*f"Client sending message: **$*msg*"*) > connection ! UdpConnected.*Send*(*ByteString*(msg)) *case > *UdpConnected.*Received*(data) => *println*(*f"Client received message > **$*{data.utf8String}*"*) *case *d @ UdpConnected.Disconnect => > connection ! d *case *UdpConnected.Disconnected => *context *stop > *self > * *case *UdpConnected.*CommandFailed*(cmd) => cmd *match *{ > *case *UdpConnected.*Send*(msg, ack) => *println*(*f"Failed sending > : **$*{msg.utf8String}*"*) *case *_ => *println*(*f"Failed > with: **$*cmd*"*) } *println*(*f"Failed command: **$*cmd*"*) > *case *unexpected => *println*(*f"Client got unexpected message: > **$*unexpected*"*) } } > > > *import *java.net.InetSocketAddress*import *akka.actor.{Props, ActorRef, > ActorSystem, Actor}*import *akka.io.{Udp, IO}*object *Server { *def > *main(args: Array[*String*]): Unit = { *println*(*"Server starting up"*) > *val **Array*(host, port) = args *val *system = *ActorSystem*(*"boo"*) > *val *rs = system.actorOf(*Props*(*new *ReceiveStuff(*new > *InetSocketAddress(host, port.toInt)))) *println*(*"Server started"*) > }}*class *ReceiveStuff(address: InetSocketAddress) *extends *Actor { *import > **context*.system *IO*(Udp) ! Udp.*Bind*(*self*, address) *override def > *receive: *Receive *= { *case *Udp.*Bound*(local) => > *println*(*f"Server bound to **$*local*"*) *context *become > ready(sender()) *case *unhandled => *println*(*f"Server received > unhandled message: **$*unhandled*"*) } *def *ready(socket: ActorRef): > *Receive *= { *case *Udp.*Received*(data, remote) =>*// > println(f"Server received message: ${data.utf8String} from ${remote}") > * socket ! Udp.*Send*(data, remote) *// echo > * *case *Udp.Unbind => socket ! Udp.Unbind *case *Udp.Unbound => > *context *stop *self > * *case *unhandled => *println*(*f"Server received unhandled message: > **$*unhandled*"*) }} > > -- > Dr Matthew Pocock > Turing ate my hamster LTD > mailto: [email protected] > > Integrative Bioinformatics Group, School of Computing Science, Newcastle > University > mailto: [email protected] > > gchat: [email protected] > msn: [email protected] > irc.freenode.net: drdozer > skype: matthew.pocock > tel: (0191) 2566550 > mob: +447535664143 > -- Dr Matthew Pocock Turing ate my hamster LTD mailto: [email protected] Integrative Bioinformatics Group, School of Computing Science, Newcastle University mailto: [email protected] gchat: [email protected] msn: [email protected] irc.freenode.net: drdozer skype: matthew.pocock tel: (0191) 2566550 mob: +447535664143 -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
