Hello,

I am trying to achieve this use case part of my learning exercise of akka. 
Calculate total number of streets in all the cities of all states. I have a 
database which contains all states, cities and streets i need. I am trying 
to achieve this in Java

Here is what i have

*Configuration:*
  akka.actor.deployment {
    /CityActor{
      router = random-pool
      nr-of-instances = 10
    }
    /StateActor {
      router = random-pool
      nr-of-instances = 1
    }
  }


Intention of the above configuration is process one state at a time and 
within each state 10 cities are processed in parallel. If needed in future, 
i would like to process multiple states in parallel as well.

*Main:*
public static void main(String[] args) {

try {

Config conf = ConfigFactory
.parseReader(
new 
FileReader(ClassLoader.getSystemResource("config/forum.conf").getFile()))
.withFallback(ConfigFactory.load());

System.out.println(conf);

final ActorSystem system = ActorSystem.create("AkkaApp", conf);

final ActorRef masterActor = 
system.actorOf(Props.create(MasterActor.class), "Migrate");

masterActor.tell("", ActorRef.noSender());

} catch (Exception e) {
e.printStackTrace();
}
}

*MasterActor:*
public class MasterActor extends UntypedActor {

private final ActorRef randomRouter = getContext().system()
.actorOf(Props.create(StateActor.class).withRouter(new 
akka.routing.FromConfig()), "StateActor");

@Override
public void onReceive(Object message) throws Exception {

if (message instanceof String) {

getContext().watch(randomRouter);

for (String aState : getStates()) {
randomRouter.tell(aState, getSelf());
}
randomRouter.tell(new Broadcast(PoisonPill.getInstance()), getSelf());

} else if (message instanceof Terminated) {

Terminated ater = (Terminated) message;

if (ater.getActor().equals(randomRouter)) {
getContext().system().terminate();
}
}
}

public List<String> getStates() {
return new ArrayList<String>(Arrays.asList("CA", "MA", "TA", "NJ", "NY"));
};

}

*StateActor:*
public class StateActor extends UntypedActor {

private final ActorRef randomRouter = getContext().system()
.actorOf(Props.create(CityActor.class).withRouter(new 
akka.routing.FromConfig()), "CityActor");

@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
System.out.println("Processing state " + message);

for (String aCity : getCitiesForState((String) message)) {
randomRouter.tell(aCity, getSelf());
}
Thread.sleep(1000);
}
}

public List<String> getCitiesForState(String stateName) {
return new ArrayList<String>(Arrays.asList("Springfield-" + stateName, 
"Salem-" + stateName,
"Franklin-" + stateName, "Clinton-" + stateName, "Georgetown-" + 
stateName));
};
}


*CityActor:*
public class CityActor extends UntypedActor {

@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
System.out.println("Processing city " + message);
Thread.sleep(1000);
}
}
}

Am i implementing this correctly?

I cannot get the program to shutdown gracefully. Not sure whether i 
implemented the PosionPill logic properly. I get message like dead letters 
encountered. 

Could you please help me with this.

I am a newbie to akka, so please pardon my ignorance

Thanks

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to