Hi,
I am looking for guidance on how to deal with exceptions with the Producer
end-point. I am aware of Akka's supervision strategy, however it does not
seem to work in my case.
The setup:
I am using akka-camel (2.3.4) in conjunction with camel-rabbitmq (2.13.2)
to publish messages to a rabbitmq end-point. My producer is extremely
simple:
class RabbitMqProducer(connection: RabbitMqConnection) extends Actor with
Producer with Oneway {
val endpointUri = connection.asUri
}
I have a supervisor sitting on top of the producer actor that does the
following:
override val supervisorStrategy = OneForOneStrategy() {
case e: AkkaCamelException => Restart
}
The problem:
Recently we had a RabbitMQ outage due to a series of power outages. When
we recovered the nodes back, the RabbitMQ producers and consumers were
never able to recover automatically. The exception at the producer end was:
akka.camel.AkkaCamelException: clean connection shutdown; reason: Attempt
to use closed channel
at
akka.camel.ProducerSupport$$anonfun$produce$1.applyOrElse(Producer.scala:73)
~[akka-camel_2.10-2.3.4.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
~[akka-actor_2.10-2.3.4.jar:na]
at
vajra.messaging.amqp.RabbitMqProducer.aroundReceive(rabbitmq.scala:19)
~[vajra-messaging_2.10.jar:1.0.6-SNAPSHOT]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
[akka-actor_2.10-2.3.4.jar:na]
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
[akka-actor_2.10-2.3.4.jar:na]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
[akka-actor_2.10-2.3.4.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
[akka-actor_2.10-2.3.4.jar:na]
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
[akka-actor_2.10-2.3.4.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[scala-library-2.10.4.jar:na]
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[scala-library-2.10.4.jar:na]
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[scala-library-2.10.4.jar:na]
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[scala-library-2.10.4.jar:na]
Caused by: com.rabbitmq.client.AlreadyClosedException: clean connection
shutdown; reason: Attempt to use closed channel
at
com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)
~[amqp-client-3.2.2.jar:na]
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:291)
~[amqp-client-3.2.2.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:636)
~[amqp-client-3.2.2.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:619)
~[amqp-client-3.2.2.jar:na]
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:610)
~[amqp-client-3.2.2.jar:na]
at
org.apache.camel.component.rabbitmq.RabbitMQProducer.process(RabbitMQProducer.java:98)
~[camel-rabbitmq-2.13.2.jar:2.13.2]
at
org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
~[camel-core-2.13.2.jar:2.13.2]
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:113)
~[camel-core-2.13.2.jar:2.13.2]
at akka.camel.ProducerSupport$ProducerChild.produce(Producer.scala:137)
~[akka-camel_2.10-2.3.4.jar:na]
at
akka.camel.ProducerSupport$ProducerChild$$anonfun$receive$1.applyOrElse(Producer.scala:111)
~[akka-camel_2.10-2.3.4.jar:na]
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
~[akka-actor_2.10-2.3.4.jar:na]
at
akka.camel.ProducerSupport$ProducerChild.aroundReceive(Producer.scala:108)
~[akka-camel_2.10-2.3.4.jar:na]
... 9 common frames omitted
2
Though we are working on making out RabbitMQ setup more robust, ideally I
want my application to recover automatically after such failures.
Unfortunately as it stands now, the producers and consumers keep using
stale connections and the only way to recover is restart the applications
again. :(
Research:
- akka.camel.ProducerSupport trait calls the register()method when a
producer is created. This registers the producer with the supervisor.
- akka.camel.internal.CamelSupervisor maintains a registry of producers.
Any new registration is added to this set (of actor-refs).
- akka.camel.internal.ProducerRegistrar is then called that maintains a map
of ActorRef to a tuple of (EndPoint, SendProcessor). For a new
registration, a new entry is created in this map, which involves creating a
new EndPoint.
- so far so good..
- Now if RabbitMq goes down, a AkkaCamelException is thrown which kicks in
the supervision strategy. This causes the producer actor to get restarted.
- The new instance of the actor goes through it's lifecycle processes and
tries to register itself.
- However the new instance has the same hash-code as the old instance. This
is expected based on akka documentation:
http://doc.akka.io/docs/akka/snapshot/scala/actors.html.
- This causes the registration process to be short-circuited because the
registries already contain the Actor. Thus no new end-point is created and
hence it keeps using the old end-point after that.
Right now I have a very hacky solution in place where I just Stop and
recreate the producer actor. Stop effective DeRegisters the actor and by
recreating it, I am able to re-do the registration part.
I am wondering if anyone else face the same problem and if there is a
better solution out there.
Thanks for your inputs in advance.
Regards,
Bharat
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.