Hi, When I try those exact sources it does gracefully shutdown after processing all of them.
Your what you want it to do does not match what it actually does however, if you want each state to process sequentially in an actor you will need to have some protocol for completion of the processing so that the top level actor does not send the next state until the previous one was completed. And for the state actor it needs to know when all its cities are done processing. A few additional general notes: Don't use standard library types such as String as messages unless you have a very good reason to, create your own message classes, using generic classes makes applications hard to follow and is error prone. I see you make the threads sleep, it is very important to never do that, as the dispatcher (thread pool) that the actors are dispatched on has few threads, this way you will starve the thread pool with the first n actors that run, and no other actor can run until their sleeps has completed. -- Johan Akka Team On Mon, Mar 27, 2017 at 4:45 AM, <[email protected]> wrote: > Hello, > > I am trying to achieve this use case part of my learning exercise of akka. > Calculate total number of streets in all the cities of all states. I have a > database which contains all states, cities and streets i need. I am trying > to achieve this in Java > > Here is what i have > > *Configuration:* > akka.actor.deployment { > /CityActor{ > router = random-pool > nr-of-instances = 10 > } > /StateActor { > router = random-pool > nr-of-instances = 1 > } > } > > > Intention of the above configuration is process one state at a time and > within each state 10 cities are processed in parallel. If needed in future, > i would like to process multiple states in parallel as well. > > *Main:* > public static void main(String[] args) { > > try { > > Config conf = ConfigFactory > .parseReader( > new FileReader(ClassLoader.getSystemResource("config/ > forum.conf").getFile())) > .withFallback(ConfigFactory.load()); > > System.out.println(conf); > > final ActorSystem system = ActorSystem.create("AkkaApp", conf); > > final ActorRef masterActor = system.actorOf(Props.create(MasterActor.class), > "Migrate"); > > masterActor.tell("", ActorRef.noSender()); > > } catch (Exception e) { > e.printStackTrace(); > } > } > > *MasterActor:* > public class MasterActor extends UntypedActor { > > private final ActorRef randomRouter = getContext().system() > .actorOf(Props.create(StateActor.class).withRouter(new > akka.routing.FromConfig()), "StateActor"); > > @Override > public void onReceive(Object message) throws Exception { > > if (message instanceof String) { > > getContext().watch(randomRouter); > > for (String aState : getStates()) { > randomRouter.tell(aState, getSelf()); > } > randomRouter.tell(new Broadcast(PoisonPill.getInstance()), getSelf()); > > } else if (message instanceof Terminated) { > > Terminated ater = (Terminated) message; > > if (ater.getActor().equals(randomRouter)) { > getContext().system().terminate(); > } > } > } > > public List<String> getStates() { > return new ArrayList<String>(Arrays.asList("CA", "MA", "TA", "NJ", "NY")); > }; > > } > > *StateActor:* > public class StateActor extends UntypedActor { > > private final ActorRef randomRouter = getContext().system() > .actorOf(Props.create(CityActor.class).withRouter(new > akka.routing.FromConfig()), "CityActor"); > > @Override > public void onReceive(Object message) throws Exception { > if (message instanceof String) { > System.out.println("Processing state " + message); > > for (String aCity : getCitiesForState((String) message)) { > randomRouter.tell(aCity, getSelf()); > } > Thread.sleep(1000); > } > } > > public List<String> getCitiesForState(String stateName) { > return new ArrayList<String>(Arrays.asList("Springfield-" + stateName, > "Salem-" + stateName, > "Franklin-" + stateName, "Clinton-" + stateName, "Georgetown-" + > stateName)); > }; > } > > > *CityActor:* > public class CityActor extends UntypedActor { > > @Override > public void onReceive(Object message) throws Exception { > if (message instanceof String) { > System.out.println("Processing city " + message); > Thread.sleep(1000); > } > } > } > > Am i implementing this correctly? > > I cannot get the program to shutdown gracefully. Not sure whether i > implemented the PosionPill logic properly. I get message like dead letters > encountered. > > Could you please help me with this. > > I am a newbie to akka, so please pardon my ignorance > > Thanks > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/ > current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
