Hi,
I am trying out a competing event consumer implementation using Akka and
Camel. Am using Akka 2.3.2 and Camel 5.8.0. I am connecting camel to
ActiveMQ broker and using a producer to generate messages from other end.
In the following code EventManager is the master which creates pool of
consumers and EventProcessor is the message processing actor.
EventManager.java
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.CamelContext;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.camel.Camel;
import akka.camel.CamelExtension;
import akka.japi.Creator;
import akka.routing.RoundRobinPool;
public class EventManager {
private final ActorSystem akkaSystem;
private CamelContext camelContext = null;
private ActorRef workRouter;
public EventManager(ActorSystem system) {
akkaSystem = system;
initialize();
}
public void initialize() {
Camel camel = CamelExtension.get(akkaSystem);
camelContext = camel.context();
ActiveMQComponent activemqComponent =
ActiveMQComponent.activeMQComponent("tcp://localhost:61616");
activemqComponent.setDeliveryPersistent(false);
camelContext.addComponent("activemq",activemqComponent );
int numOfWorkers = 5;
// distributing the message processing across a pool of 5 actors
ActorRef workRouter =
akkaSystem.actorOf(new
RoundRobinPool(numOfWorkers).props(Props.create(EventProcessor.class)),
"workRouter");
}
}
EventProcessor.java
import org.apache.log4j.Logger;
import com.wipro.sif.controller.event.ControllerEvent;
import akka.actor.UntypedActor;
import akka.camel.CamelMessage;
import akka.camel.javaapi.UntypedConsumerActor;
public class EventProcessor extends UntypedConsumerActor{
private static final Logger LOGGER =
Logger.getLogger(EventProcessor.class);
public EventProcessor() {
}
public void onReceive(Object message) {
if(message instanceof CamelMessage) {
CamelMessage camelMessage = (CamelMessage) message;
String body = camelMessage.getBodyAs(String.class, getCamelContext());
LOGGER.info("Message handled by :" +this.getSelf().path().name());
LOGGER.info("Message body:" + body);
}
}
public boolean autoAck() {
return true;
}
public String getEndpointUri() {
return "activemq:queue:dest";
}
}
The problem I am seeing is that the messages seems to be consumed by a
single actor and not getting distributed across the pool. Do I need to
create a separate camel route to distribute ? I would also like to
distribute the processing across different physical nodes. Appreciate your
inputs and best practices.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.