Hello,
I am trying to achieve this use case part of my learning exercise of akka.
Calculate total number of streets in all the cities of all states. I have a
database which contains all states, cities and streets i need. I am trying
to achieve this in Java
Here is what i have
*Configuration:*
akka.actor.deployment {
/CityActor{
router = random-pool
nr-of-instances = 10
}
/StateActor {
router = random-pool
nr-of-instances = 1
}
}
Intention of the above configuration is process one state at a time and
within each state 10 cities are processed in parallel. If needed in future,
i would like to process multiple states in parallel as well.
*Main:*
public static void main(String[] args) {
try {
Config conf = ConfigFactory
.parseReader(
new
FileReader(ClassLoader.getSystemResource("config/forum.conf").getFile()))
.withFallback(ConfigFactory.load());
System.out.println(conf);
final ActorSystem system = ActorSystem.create("AkkaApp", conf);
final ActorRef masterActor =
system.actorOf(Props.create(MasterActor.class), "Migrate");
masterActor.tell("", ActorRef.noSender());
} catch (Exception e) {
e.printStackTrace();
}
}
*MasterActor:*
public class MasterActor extends UntypedActor {
private final ActorRef randomRouter = getContext().system()
.actorOf(Props.create(StateActor.class).withRouter(new
akka.routing.FromConfig()), "StateActor");
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
getContext().watch(randomRouter);
for (String aState : getStates()) {
randomRouter.tell(aState, getSelf());
}
randomRouter.tell(new Broadcast(PoisonPill.getInstance()), getSelf());
} else if (message instanceof Terminated) {
Terminated ater = (Terminated) message;
if (ater.getActor().equals(randomRouter)) {
getContext().system().terminate();
}
}
}
public List<String> getStates() {
return new ArrayList<String>(Arrays.asList("CA", "MA", "TA", "NJ", "NY"));
};
}
*StateActor:*
public class StateActor extends UntypedActor {
private final ActorRef randomRouter = getContext().system()
.actorOf(Props.create(CityActor.class).withRouter(new
akka.routing.FromConfig()), "CityActor");
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
System.out.println("Processing state " + message);
for (String aCity : getCitiesForState((String) message)) {
randomRouter.tell(aCity, getSelf());
}
Thread.sleep(1000);
}
}
public List<String> getCitiesForState(String stateName) {
return new ArrayList<String>(Arrays.asList("Springfield-" + stateName,
"Salem-" + stateName,
"Franklin-" + stateName, "Clinton-" + stateName, "Georgetown-" +
stateName));
};
}
*CityActor:*
public class CityActor extends UntypedActor {
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
System.out.println("Processing city " + message);
Thread.sleep(1000);
}
}
}
Am i implementing this correctly?
I cannot get the program to shutdown gracefully. Not sure whether i
implemented the PosionPill logic properly. I get message like dead letters
encountered.
Could you please help me with this.
I am a newbie to akka, so please pardon my ignorance
Thanks
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.