Hi all!

I have an actor that use Akka IO to publish messages to a UDP port 
(potentially other transports later).

In order to test this, I wanted to send it a message and wait for the UDP 
packet to be sent. However, messages received before the actor is connected 
(or the socket is set up etc) are, and should be, ignored. This means that 
in my "connected" test case I need to wait for the socket to become ready 
(in this case, receive the Udp.SimpleSender message).

I have got something working, but it is rather ugly, and I'm looking for 
the best way of doing this.

What I've got is this actor (the url parameter is only used in logging, 
which I omitted for brevity):

object PublisherProtocol {
  final case class MetricsUpdate(timestamp: Long, counters: 
immutable.Map[Entity, Long])
}

class UdpPublisher(destinationAddress: InetSocketAddress, url: String) 
extends Actor {
  import context.system
  import PublisherProtocol._

  val encoder = MessageEncoder.jsonEncoder()

  override def preStart = {
    IO(Udp) ! UdpMessage.simpleSender
  }

  def sendMetrics(timestamp: Long, metrics: collection.Map[Entity, Long], 
connection: ActorRef) = {
    for {
      (entity, value) <- metrics
      bytes = encoder.encode(entity.name, timestamp, value, entity.tags)
    } {
      connection ! UdpMessage.send(bytes, target = destinationAddress)
    }
  }

  override def receive = {
    case Udp.SimpleSenderReady => context.become(connected(sender()))
  }

  def connected(connection : ActorRef) : Receive = {
    case MetricsUpdate(timestamp, counters) => {
      sendMetrics(timestamp, counters, connection)
    }
  }
}

And this mixin trait:

package akka.teststuff

import akka.actor.Actor

trait MessageIntercept extends Actor {
  override protected[akka] def aroundReceive(receive: Actor.Receive, msg: 
Any): Unit = {
    val newMsg = intercept(msg)
    super.aroundReceive(receive, newMsg)
  }

  def intercept(msg: Any): Any = ???
}



That I test like this (most test framework gunk omitted for brevity):

import PublisherProtocol._

val channel = UdpPublisherTest.createChannel
val address = channel.socket.getLocalSocketAddress.asInstanceOf[
InetSocketAddress]
val latch = new CountDownLatch(1)
val props = UdpPublisherTest.publisherProps(address, latch)

val publisher = system.actorOf(props)

latch.await(1, TimeUnit.SECONDS)
publisher ! MetricsUpdate(1234, Map(
  Entity("name", "counter", Map("tag1" -> "value1")) -> 321)
)

val data = UdpPublisherTest.readJson(channel)
data should be(Json.obj(
  "timestamp" -> Json.toJson(1.234),
  "type" -> Json.toJson("agg_metric"),
  "name" -> Json.toJson("name"),
  "value" -> Json.toJson(321),
  "tags" -> Json.obj("tag1" -> Json.toJson("value1"))))
}



object UdpPublisherTest {

  private val MAX_JSON_SIZE = 1024

  def parseJson(byteBuffer: ByteBuffer) = {
    val bytes = new Array[Byte](byteBuffer.remaining())
    byteBuffer.get(bytes)

    Json.parse(bytes)
  }

  def readJson(channel: DatagramChannel): JsValue = {
    val buffer = ByteBuffer.allocate(MAX_JSON_SIZE)
    channel.receive(buffer)
    buffer.flip()
    parseJson(buffer)
  }

  def createChannel: DatagramChannel = {
    val channel = DatagramChannel.open()
    channel.socket.bind(new InetSocketAddress("127.0.0.1", 0))
    channel
  }

  def publisherProps(address: InetSocketAddress, latch: CountDownLatch) = {
    val url = s"udp://${address.getHostString}:${address.getPort}"
    Props(new UdpPublisher(address, url) with MessageIntercept {
      override def intercept(msg: Any): Any = {
        msg match {
          case Udp.SimpleSenderReady => latch.countDown()
          case _ =>
        }
        msg
      }
    })
  }
}


The test passes, but it seems to me there ought to be a nicer way to do it.

I am both an Akka and Scala noob, so all suggestions and arguments are 
appreciated.

So far the following alternatives have been considered (and have not 
necessarily been rejected yet):

   1. Send a message from the actor when it becomes ready
   - I don't want to add testing code to the production actor
      2. Override receive instead of aroundReceive
   - That would change the behaviour of the actor and will also only work 
      for a subset of test cases
      - The test code becomes less legible than I would like - but I'm not 
      sure it's less so than than the proposed MessageIntercept variant
      - Can be combined with 1, so that's nice
   3. Use stash()/unstas() in the receive method
      - That would work and has some nice properties, but it could cause a 
      queue buildup if there are connection problems (which wouldn't be an 
issue 
      in the UDP case, but might be for the general case)
   4. Add Thread.sleep(1000) to the test
   - Well, that would probably work, doesn't really scale well :)
   

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to