Actors are cheap so you can start one actor (make them child actors if some actor that manages the work things), so then you don’t have that multiplexing issue at all.
Remember to put the blocking things onto a dedicated thread pool dispatcher — read docs about dealing with blocking to see how -- Konrad Malawski On January 26, 2018 at 3:56:05, Richard Gong ([email protected]) wrote: > I'm newbie in Akka. Here's one thing I'm trying to achieve, a simple job > manager. There are jobs involving SQL queries and CPU intended computation. > The jobs should be cancellable while they're running. Here's my code to > simulate it. The worker actor sends "continue" message to itself if no > cancellation signal. However, the master actor may sends another job to it > while the current job is still running. Question is how to guarantee the > jobs don't step on each other? This may not be the way Actors do. Any > suggestions are very welcome. > > import java.io.IOException; > > import akka.actor.AbstractActor; > import akka.actor.ActorRef; > import akka.actor.ActorSystem; > import akka.actor.Props; > import akka.routing.RoundRobinPool; > > class StartJob { > private final String id; > private final int time; > > public StartJob(String id, int time) { > super(); > this.id = id; > this.time = time; > } > > public String getId() { > return id; > } > > public int getTime() { > return time; > } > > } > > class CancelJob { > private final String id; > > public CancelJob(String id) { > super(); > this.id = id; > } > > public String getId() { > return id; > } > > } > > class Worker extends AbstractActor { > > private String jobId; > private boolean cancelling; > private int time; > private int i; > > @Override > public Receive createReceive() { > return receiveBuilder().match(StartJob.class, job -> { > jobId = job.getId(); > cancelling = false; > i = 0; > time = job.getTime(); > System.out.printf("Worker %s starts with id=%s\n", self(), > job.getId()); > self().tell("continue", self()); > }).matchEquals("continue", p -> { > if (cancelling) { > System.out.printf("Worker %s cancelled.\n", jobId); > return; > } > System.out.printf("Worker %s progress %.0f%%\n", jobId, i * > 100.0 / time); > Thread.sleep(1000); // simulate SQL query > for (int j=0; j<1000000; j++); // simulate CPU intended job > i++; > if (i < time) { > self().tell("continue", self()); > } else { > System.out.printf("Worker %s finished.\n", self()); > } > }).match(CancelJob.class, job -> job.getId().equals(jobId), job -> > { > System.out.printf("Worker %s cancelling.\n", job.getId()); > cancelling = true; > }).build(); > } > > } > > class Master extends AbstractActor { > > private final ActorRef workerRouter; > > public Master(int numWorkers) { > workerRouter = > this.getContext().actorOf(Props.create(Worker.class).withRouter(new > RoundRobinPool(numWorkers)), > "workerRouter"); > } > > @Override > public Receive createReceive() { > return receiveBuilder().match(StartJob.class, job -> { > System.out.printf("Submitting job %s\n", job.getId()); > workerRouter.tell(job, self()); > }).match(CancelJob.class, job -> { > System.out.printf("Cancelling job %s\n", job.getId()); > getContext().actorSelection("workerRouter/*").tell(job, > self()); > }).build(); > } > } > > public class AnalyticsApp { > public static void main(String[] args) throws IOException, > InterruptedException { > ActorSystem system = ActorSystem.create("JobManager"); > > ActorRef masterRef = system.actorOf(Props.create(Master.class, 2), > "job-manager"); > System.out.println("Master: " + masterRef); > > for (int i = 0; i < 4; i++) { > masterRef.tell(new StartJob(Integer.toString(i), 5), > ActorRef.noSender()); > } > > Thread.sleep(1000); > masterRef.tell(new CancelJob("1"), ActorRef.noSender()); > > System.out.println(">>> Press ENTER to exit <<<"); > try { > System.in.read(); > } finally { > system.terminate(); > } > > } > } > > -- > >>>>>>>>>> Read the docs: http://akka.io/docs/ > >>>>>>>>>> Check the FAQ: > http://doc.akka.io/docs/akka/current/additional/faq.html > >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user > --- > You received this message because you are subscribed to the Google Groups > "Akka User List" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > Visit this group at https://groups.google.com/group/akka-user. > For more options, visit https://groups.google.com/d/optout. > -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to [email protected]. To post to this group, send email to [email protected]. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
