Hi,

I am looking for guidance on how to deal with exceptions with the Producer 
end-point. I am aware of Akka's supervision strategy, however it does not 
seem to work in my case. 

The setup:
I am using akka-camel (2.3.4) in conjunction with camel-rabbitmq (2.13.2) 
to publish messages to a rabbitmq end-point. My producer is extremely 
simple:
class RabbitMqProducer(connection: RabbitMqConnection) extends Actor with 
Producer with Oneway {
  val endpointUri = connection.asUri
}

I have a supervisor sitting on top of the producer actor that does the 
following:
  override val supervisorStrategy = OneForOneStrategy() {
    case e: AkkaCamelException => Restart
  }

The problem:
Recently we had a  RabbitMQ outage due to a series of power outages. When 
we recovered the nodes back, the RabbitMQ producers and consumers were 
never able to recover automatically. The exception at the producer end was:
akka.camel.AkkaCamelException: clean connection shutdown; reason: Attempt 
to use closed channel
    at 
akka.camel.ProducerSupport$$anonfun$produce$1.applyOrElse(Producer.scala:73) 
~[akka-camel_2.10-2.3.4.jar:na]
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
~[akka-actor_2.10-2.3.4.jar:na]
    at 
vajra.messaging.amqp.RabbitMqProducer.aroundReceive(rabbitmq.scala:19) 
~[vajra-messaging_2.10.jar:1.0.6-SNAPSHOT]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
[akka-actor_2.10-2.3.4.jar:na]
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
[akka-actor_2.10-2.3.4.jar:na]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
[akka-actor_2.10-2.3.4.jar:na]
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
[akka-actor_2.10-2.3.4.jar:na]
    at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 
[akka-actor_2.10-2.3.4.jar:na]
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[scala-library-2.10.4.jar:na]
    at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 
[scala-library-2.10.4.jar:na]
    at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[scala-library-2.10.4.jar:na]
    at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
[scala-library-2.10.4.jar:na]
Caused by: com.rabbitmq.client.AlreadyClosedException: clean connection 
shutdown; reason: Attempt to use closed channel
    at 
com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190) 
~[amqp-client-3.2.2.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:291) 
~[amqp-client-3.2.2.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:636) 
~[amqp-client-3.2.2.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:619) 
~[amqp-client-3.2.2.jar:na]
    at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:610) 
~[amqp-client-3.2.2.jar:na]
    at 
org.apache.camel.component.rabbitmq.RabbitMQProducer.process(RabbitMQProducer.java:98)
 
~[camel-rabbitmq-2.13.2.jar:2.13.2]
    at 
org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
 
~[camel-core-2.13.2.jar:2.13.2]
    at 
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:113) 
~[camel-core-2.13.2.jar:2.13.2]
    at akka.camel.ProducerSupport$ProducerChild.produce(Producer.scala:137) 
~[akka-camel_2.10-2.3.4.jar:na]
    at 
akka.camel.ProducerSupport$ProducerChild$$anonfun$receive$1.applyOrElse(Producer.scala:111)
 
~[akka-camel_2.10-2.3.4.jar:na]
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
~[akka-actor_2.10-2.3.4.jar:na]
    at 
akka.camel.ProducerSupport$ProducerChild.aroundReceive(Producer.scala:108) 
~[akka-camel_2.10-2.3.4.jar:na]
    ... 9 common frames omitted
2

Though we are working on making out RabbitMQ setup more robust, ideally I 
want my application to recover automatically after such failures. 
Unfortunately as it stands now, the producers and consumers keep using 
stale connections and the only way to recover is restart the applications 
again. :(

Research:
- akka.camel.ProducerSupport trait calls the register()method when a 
producer is created. This registers the producer with the supervisor.
- akka.camel.internal.CamelSupervisor maintains a registry of producers. 
Any new registration is added to this set (of actor-refs).
- akka.camel.internal.ProducerRegistrar is then called that maintains a map 
of ActorRef to a tuple of (EndPoint, SendProcessor). For a new 
registration, a new entry is created in this map, which involves creating a 
new EndPoint.
- so far so good..
- Now if RabbitMq goes down, a AkkaCamelException is thrown which kicks in 
the supervision strategy. This causes the producer actor to get restarted.
- The new instance of the actor goes through it's lifecycle processes and 
tries to register itself.
- However the new instance has the same hash-code as the old instance. This 
is expected based on akka documentation: 
http://doc.akka.io/docs/akka/snapshot/scala/actors.html.
- This causes the registration process to be short-circuited because the 
registries already contain the Actor. Thus no new end-point is created and 
hence it keeps using the old end-point after that.

Right now I have a very hacky solution in place where I just Stop and 
recreate the producer actor. Stop effective DeRegisters the actor and by 
recreating it, I am able to re-do the registration part. 

I am wondering if anyone else face the same problem and if there is a 
better solution out there.

Thanks for your inputs in advance.

Regards,
Bharat

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to