Hi, 

I am trying out a competing event consumer implementation using Akka and 
Camel. Am using Akka 2.3.2 and Camel 5.8.0. I am connecting camel to 
ActiveMQ broker and using a producer to generate messages from other end. 
In the following code EventManager is the master which creates pool of 
consumers and EventProcessor is the message processing actor. 

EventManager.java

        import org.apache.activemq.ActiveMQConnection;
        import org.apache.activemq.camel.component.ActiveMQComponent;
        import org.apache.camel.CamelContext;
        
        import akka.actor.ActorRef;
        import akka.actor.ActorSystem;
        import akka.actor.Props;
        import akka.actor.UntypedActor;
        
        import akka.camel.Camel;
        import akka.camel.CamelExtension;
        import akka.japi.Creator;
        import akka.routing.RoundRobinPool;
        
        public class EventManager {
        
        
        
        
        private final ActorSystem akkaSystem;
        
        private CamelContext camelContext = null;
        
        private ActorRef workRouter;
        
        public EventManager(ActorSystem system) {
        akkaSystem = system;
        
        initialize();
        }
        
        public void initialize() {
        
        Camel camel = CamelExtension.get(akkaSystem);
        
        camelContext = camel.context();
        
        ActiveMQComponent activemqComponent = 
ActiveMQComponent.activeMQComponent("tcp://localhost:61616");
        activemqComponent.setDeliveryPersistent(false);
        camelContext.addComponent("activemq",activemqComponent );
        
        
        int numOfWorkers = 5;
    
    // distributing the message processing across a pool of 5 actors
        ActorRef workRouter =
          akkaSystem.actorOf(new 
RoundRobinPool(numOfWorkers).props(Props.create(EventProcessor.class)), 
            "workRouter");
        
        }
        
        }

EventProcessor.java

    import org.apache.log4j.Logger;
    
    import com.wipro.sif.controller.event.ControllerEvent;
    
    import akka.actor.UntypedActor;
    import akka.camel.CamelMessage;
    import akka.camel.javaapi.UntypedConsumerActor;
    
    public class EventProcessor extends UntypedConsumerActor{
    
    private static final Logger LOGGER = 
Logger.getLogger(EventProcessor.class);
    public EventProcessor() {
    
    }
    
    public void onReceive(Object message) {
    if(message instanceof CamelMessage) {
    CamelMessage camelMessage = (CamelMessage) message;
    String body = camelMessage.getBodyAs(String.class, getCamelContext());
    LOGGER.info("Message handled by :" +this.getSelf().path().name());
    
    LOGGER.info("Message body:" + body);
    }
    
    
    }
    
    public boolean autoAck() {
    return true;
    }
    public String getEndpointUri() {
    return "activemq:queue:dest";
    }
    
    }

The problem I am seeing is that the messages seems to be consumed by a 
single actor and not getting distributed across the pool. Do I need to 
create a separate camel route to distribute ? I would also like to 
distribute the processing across different physical nodes. Appreciate your 
inputs and best practices. 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to