Hi all,
I'm working on implementing retry functionality in one of my actor's that
will retry writing a message to RabbitMQ if the original write fails. This
looks something like this:
private String retryMessage;
private void processMessage(String message) {
try {
writer.write(message);
} catch (IOException e) {
LOGGER.warn("Unable to write message to RabbitMQ. Retrying...");
retryMessage = message;
context().become(retry);
sendTimeout();
}
}
private PartialFunction<Object, BoxedUnit> retry = ReceiveBuilder
.match(String.class, message -> stash())
.match(ReceiveTimeout.class, receiveTimeout -> retryMessage())
.build();
private void sendTimeout() {
long waitTime = getWaitTime();
context().setReceiveTimeout(Duration.Undefined());
context().setReceiveTimeout(Duration.create(waitTime,
TimeUnit.MILLISECONDS));
}
private void retryMessage() {
try {
writer.write(retryMessage);
context().setReceiveTimeout(Duration.Undefined());
context().unbecome();
unstashAll();
} catch (IOException e) {
LOGGER.warn("Unable to write message to RabbitMQ. Retrying...");
sendTimeout();
}
}
As you can see, when the write fails, the actor switches to a "retry" mode.
In this retry mode, it will stash any incoming messages that are meant to
be written until it can successfully write the original message that it
failed on. To do a retry, I set a ReceiveTimeout that whose duration
increases exponentially with each failure. When the actor receives a
ReceiveTimeout, it simply tries to write it again. Now the part that I'm
curious about has to do with unstashing all of the messages and unbecoming
its retry mode.
In the retryMessage() method, if the write is successful, the
ReceiveTimeout will be shut off, the actor will revert back to its original
state, and any messages that were stashed during this time will be
unstashed. In terms of reverting back to its original state and unstashing
messages, is this the right order in which to do this? In all of the
examples that I've seen, I noticed that the unstashAll() always came before
the context().unbecome() which is what I originally had. After giving it
some thought, however, I started to wonder if having this order would cause
me to lose any messages if they arrived in between the unstashAll() and
context().unbecome() operations thus causing them to get stashed but never
becoming unstashed again. This is why I ended up reversing their order. Is
my thinking correct?
Luis
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.