Normally on a threaded application I can create a pool of threads. I
created something to mimic workeractor1.java being called three times.
I'm just starting to learn AKKA so could someone guide me to how I can have
it run the same calls with three possible threads?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
package main;
import java.util.concurrent.TimeUnit;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.dispatch.*;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.kernel.Bootable;
import akka.pattern.Patterns;
import akka.util.*;
public class ParserActor implements Bootable {
public static class Result {
}
LoggingAdapter log = null;
ActorSystem system = null;
public ParserActor() throws Exception {
System.out.println("Start");
system = ActorSystem.create("myApp");
log = Logging.getLogger(system, this);
log.info("Ok building supervisor ");
ActorRef supervisor = system.actorOf(new Props(main.SupervisorParserActor.class), "supervisor");
log.info("Starting supervisor with command start ");
supervisor.tell("start", supervisor);
log.info("Awaiting supervisor results ");
Object result = (Object) Await.result(
Patterns.ask(supervisor, new Result(), 5000),
Duration.create(5000, TimeUnit.MILLISECONDS));
log.info("Supervisor Value Recieved->" + result.toString());
assert result.equals(Integer.valueOf(8));
system.shutdown();
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
new ParserActor();
}
public void shutdown() {
system.shutdown();
}
public void startup() {
}
}
package main;
import static akka.actor.SupervisorStrategy.escalate;
import static akka.actor.SupervisorStrategy.restart;
import static akka.actor.SupervisorStrategy.resume;
import static akka.actor.SupervisorStrategy.stop;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import main.ParserActor.Result;
import akka.actor.ActorRef;
import akka.actor.AllForOneStrategy;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.actor.SupervisorStrategy.Directive;
import akka.actor.UntypedActor;
import akka.japi.Function;
import akka.util.*;
public class SupervisorParserActor extends UntypedActor {
@SuppressWarnings("unused")
public ActorRef workerActor2;
public ActorRef workerActor1;
public ActorRef workerActor3;
public SupervisorParserActor() {
workerActor1 = getContext().actorOf(new Props(main.WorkerActor1.class), "workerActor1");
workerActor2 = getContext().actorOf(new Props(main.WorkerActor1.class), "workerActor2");
workerActor3 = getContext().actorOf(new Props(main.WorkerActor1.class), "workerActor3");
}
private static SupervisorStrategy strategy = new OneForOneStrategy(1,
Duration.create(10, TimeUnit.SECONDS), new Function<Throwable, Directive>() {
public Directive apply(Throwable t) {
if (t instanceof NullPointerException) {
// return restart();
//} else if (t instanceof IllegalArgumentException) {
return stop();
} else {
return escalate();
}
}
});
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
public void onReceive(Object msg) throws Exception {
if (msg instanceof Result) {
System.out.println("supervisor here " );
workerActor1.tell(msg, getSender());
workerActor2.tell(msg, getSender());
workerActor3.tell(msg, getSender());
} else{
System.out.println("supervisor here 3 " );
workerActor1.tell(msg);
workerActor2.tell(msg);
workerActor3.tell(msg);
}
}
}
package main;
import java.util.Random;
import main.ParserActor.Result;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
public class WorkerActor1 extends UntypedActor {
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
private Object state;
@Override
public void preStart() {
log.info("Starting WorkerActor instance hashcode #" + this.hashCode());
}
public void onReceive(Object o) throws Exception {
log.info("WorkerActor1 onReceive called " + o.toString());
if (o == null) {
throw new NullPointerException("Null Value Passed");
} else if (o instanceof Result) {
//send back up to supervisor
log.info("WorkerActor telling " + state);
getSender().tell(state, getSelf());
} else if(o instanceof String){
if(o.equals("start")){
//throw new Exception("throw this exception");
/*ParseContentDatabase parser = new ParseContentDatabase(o);
parser.init();
state = "stop";*/
Random rand = new Random();
int randomNum = rand.nextInt((55) + 1) + 1;
state = "stop "+ Integer.valueOf(randomNum);
}
} else {
throw new IllegalArgumentException("Wrong Arguement");
}
//
}
@Override
public void postStop() {
log.info("Stopping WorkerActor1 instance hashcode #" + this.hashCode());
}
}