Hi again,

I'm not really sure how to fix this. I've had a hunt through the akka
sources and found that CommandFailed(Send(_), _) is returned if the
underlying UdpConnection instance is in the writePending state. The
UdpConnection seems to be implemented so as to process requests strictly in
series, with no buffering of messages yet to be sent.

Is the right way to handle this to have another actor that is managing a
messages-to-be-sent queue, which listens to send acknowledgements before
sending the next request? Will I get increased throughput by spawning
multiple UdpConnected instances to the same target address?

Thanks,
Matthew


On 2 June 2014 23:43, Matthew Pocock <[email protected]> wrote:

> Hi,
>
> I'm writing a very simple UDP server that echos messages back to a sender.
> The sender tries to spam the server. When I run the client, I get a lot of
> `Send(ByteString(......), NoAk(null))` responses. The UDP server literally
> just echos the content back. It isn't doing any blocking compute.
>
> Any ideas what I can do to get rid of these send failures? Code below.
>
> Matthew
>
> *import *java.net.InetSocketAddress*import *akka.actor.{Props, ActorSystem, 
> ActorRef, Actor}*import *akka.io.{UdpConnected, IO, Udp}*import 
> *akka.util.ByteString*import *scala.collection.immutable.Queue*object *Client 
> {  *def *main(args: Array[*String*]): Unit = {    *println*(*"Client starting 
> up"*)    *val **Array*(host, port) = args    *val *system = 
> *ActorSystem*(*"boo"*)    *val *ss = system.actorOf(*Props*(*new 
> *SendStuffConnected(*new *InetSocketAddress(host, port.toInt))))    
> *println*(*"Client started"*)    *this *synchronized {      *this *wait 1000  
>   }    *println*(*"Sending messages"*)    *for*(i <- 1 to 10000) {      ss ! 
> *f"**$*i*: Hi Mum"
> *      ss ! *f"**$*i*: Yo Dad"
> *    }    *println*(*"Client sent message"*)  }}*case class 
> *SendStuffConnected(address: InetSocketAddress) *extends *Actor {  *import 
> **context*.system  *IO*(UdpConnected) ! UdpConnected.*Connect*(*self*, 
> address)  *def *receive: *Receive *= awaitReady()    *def *awaitReady(msgs: 
> Queue[*String*] = Queue.*empty*): *Receive *= {    *case 
> *UdpConnected.Connected =>      *println*(*f"Client awaiting ready with 
> messages: **$*msgs*"*)      *context *become ready(sender())      msgs 
> foreach (*self *! _)    *case *msg: *String *=>      *println*(*f"Client 
> stashing message: **$*msg*"*)      *context *become awaitReady(msgs enqueue 
> msg)  }  *def *ready(connection: ActorRef): *Receive *= {    *case *msg: 
> *String *=>      *println*(*f"Client sending message: **$*msg*"*)      
> connection ! UdpConnected.*Send*(*ByteString*(msg))    *case 
> *UdpConnected.*Received*(data) =>      *println*(*f"Client received message 
> **$*{data.utf8String}*"*)    *case *d @ UdpConnected.Disconnect =>      
> connection ! d    *case *UdpConnected.Disconnected =>      *context *stop 
> *self
> *    *case *UdpConnected.*CommandFailed*(cmd) =>      cmd *match *{        
> *case *UdpConnected.*Send*(msg, ack) =>          *println*(*f"Failed sending 
> : **$*{msg.utf8String}*"*)        *case *_ =>          *println*(*f"Failed 
> with: **$*cmd*"*)      }      *println*(*f"Failed command: **$*cmd*"*)    
> *case *unexpected =>      *println*(*f"Client got unexpected message: 
> **$*unexpected*"*)  }  }
>
>
> *import *java.net.InetSocketAddress*import *akka.actor.{Props, ActorRef, 
> ActorSystem, Actor}*import *akka.io.{Udp, IO}*object *Server {  *def 
> *main(args: Array[*String*]): Unit = {    *println*(*"Server starting up"*)   
>  *val **Array*(host, port) = args    *val *system = *ActorSystem*(*"boo"*)    
> *val *rs = system.actorOf(*Props*(*new *ReceiveStuff(*new 
> *InetSocketAddress(host, port.toInt))))    *println*(*"Server started"*)  
> }}*class *ReceiveStuff(address: InetSocketAddress) *extends *Actor {  *import 
> **context*.system  *IO*(Udp) ! Udp.*Bind*(*self*, address)  *override def 
> *receive: *Receive *= {    *case *Udp.*Bound*(local) =>      
> *println*(*f"Server bound to **$*local*"*)      *context *become 
> ready(sender())    *case *unhandled =>      *println*(*f"Server received 
> unhandled message: **$*unhandled*"*)  }  *def *ready(socket: ActorRef): 
> *Receive *= {    *case *Udp.*Received*(data, remote) =>*//      
> println(f"Server received message: ${data.utf8String} from ${remote}")
> *      socket ! Udp.*Send*(data, remote) *// echo
> *    *case *Udp.Unbind =>      socket ! Udp.Unbind    *case *Udp.Unbound =>   
>    *context *stop *self
> *    *case *unhandled =>      *println*(*f"Server received unhandled message: 
> **$*unhandled*"*)  }}
>
> --
> Dr Matthew Pocock
> Turing ate my hamster LTD
> mailto: [email protected]
>
> Integrative Bioinformatics Group, School of Computing Science, Newcastle
> University
> mailto: [email protected]
>
> gchat: [email protected]
> msn: [email protected]
> irc.freenode.net: drdozer
> skype: matthew.pocock
> tel: (0191) 2566550
> mob: +447535664143
>



-- 
Dr Matthew Pocock
Turing ate my hamster LTD
mailto: [email protected]

Integrative Bioinformatics Group, School of Computing Science, Newcastle
University
mailto: [email protected]

gchat: [email protected]
msn: [email protected]
irc.freenode.net: drdozer
skype: matthew.pocock
tel: (0191) 2566550
mob: +447535664143

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to