Hi all!
I have an actor that use Akka IO to publish messages to a UDP port
(potentially other transports later).
In order to test this, I wanted to send it a message and wait for the UDP
packet to be sent. However, messages received before the actor is connected
(or the socket is set up etc) are, and should be, ignored. This means that
in my "connected" test case I need to wait for the socket to become ready
(in this case, receive the Udp.SimpleSender message).
I have got something working, but it is rather ugly, and I'm looking for
the best way of doing this.
What I've got is this actor (the url parameter is only used in logging,
which I omitted for brevity):
object PublisherProtocol {
final case class MetricsUpdate(timestamp: Long, counters:
immutable.Map[Entity, Long])
}
class UdpPublisher(destinationAddress: InetSocketAddress, url: String)
extends Actor {
import context.system
import PublisherProtocol._
val encoder = MessageEncoder.jsonEncoder()
override def preStart = {
IO(Udp) ! UdpMessage.simpleSender
}
def sendMetrics(timestamp: Long, metrics: collection.Map[Entity, Long],
connection: ActorRef) = {
for {
(entity, value) <- metrics
bytes = encoder.encode(entity.name, timestamp, value, entity.tags)
} {
connection ! UdpMessage.send(bytes, target = destinationAddress)
}
}
override def receive = {
case Udp.SimpleSenderReady => context.become(connected(sender()))
}
def connected(connection : ActorRef) : Receive = {
case MetricsUpdate(timestamp, counters) => {
sendMetrics(timestamp, counters, connection)
}
}
}
And this mixin trait:
package akka.teststuff
import akka.actor.Actor
trait MessageIntercept extends Actor {
override protected[akka] def aroundReceive(receive: Actor.Receive, msg:
Any): Unit = {
val newMsg = intercept(msg)
super.aroundReceive(receive, newMsg)
}
def intercept(msg: Any): Any = ???
}
That I test like this (most test framework gunk omitted for brevity):
import PublisherProtocol._
val channel = UdpPublisherTest.createChannel
val address = channel.socket.getLocalSocketAddress.asInstanceOf[
InetSocketAddress]
val latch = new CountDownLatch(1)
val props = UdpPublisherTest.publisherProps(address, latch)
val publisher = system.actorOf(props)
latch.await(1, TimeUnit.SECONDS)
publisher ! MetricsUpdate(1234, Map(
Entity("name", "counter", Map("tag1" -> "value1")) -> 321)
)
val data = UdpPublisherTest.readJson(channel)
data should be(Json.obj(
"timestamp" -> Json.toJson(1.234),
"type" -> Json.toJson("agg_metric"),
"name" -> Json.toJson("name"),
"value" -> Json.toJson(321),
"tags" -> Json.obj("tag1" -> Json.toJson("value1"))))
}
object UdpPublisherTest {
private val MAX_JSON_SIZE = 1024
def parseJson(byteBuffer: ByteBuffer) = {
val bytes = new Array[Byte](byteBuffer.remaining())
byteBuffer.get(bytes)
Json.parse(bytes)
}
def readJson(channel: DatagramChannel): JsValue = {
val buffer = ByteBuffer.allocate(MAX_JSON_SIZE)
channel.receive(buffer)
buffer.flip()
parseJson(buffer)
}
def createChannel: DatagramChannel = {
val channel = DatagramChannel.open()
channel.socket.bind(new InetSocketAddress("127.0.0.1", 0))
channel
}
def publisherProps(address: InetSocketAddress, latch: CountDownLatch) = {
val url = s"udp://${address.getHostString}:${address.getPort}"
Props(new UdpPublisher(address, url) with MessageIntercept {
override def intercept(msg: Any): Any = {
msg match {
case Udp.SimpleSenderReady => latch.countDown()
case _ =>
}
msg
}
})
}
}
The test passes, but it seems to me there ought to be a nicer way to do it.
I am both an Akka and Scala noob, so all suggestions and arguments are
appreciated.
So far the following alternatives have been considered (and have not
necessarily been rejected yet):
1. Send a message from the actor when it becomes ready
- I don't want to add testing code to the production actor
2. Override receive instead of aroundReceive
- That would change the behaviour of the actor and will also only work
for a subset of test cases
- The test code becomes less legible than I would like - but I'm not
sure it's less so than than the proposed MessageIntercept variant
- Can be combined with 1, so that's nice
3. Use stash()/unstas() in the receive method
- That would work and has some nice properties, but it could cause a
queue buildup if there are connection problems (which wouldn't be an
issue
in the UDP case, but might be for the general case)
4. Add Thread.sleep(1000) to the test
- Well, that would probably work, doesn't really scale well :)
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.