I'm newbie in Akka. Here's one thing I'm trying to achieve, a simple job 
manager. There are jobs involving SQL queries and CPU intended computation. 
The jobs should be cancellable while they're running. Here's my code to 
simulate it. The worker actor sends "continue" message to itself if no 
cancellation signal. However, the master actor may sends another job to it 
while the current job is still running. Question is how to guarantee the 
jobs don't step on each other? This may not be the way Actors do. Any 
suggestions are very welcome. 

import java.io.IOException;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.RoundRobinPool;

class StartJob {
    private final String id;
    private final int time;

    public StartJob(String id, int time) {
        super();
        this.id = id;
        this.time = time;
    }

    public String getId() {
        return id;
    }

    public int getTime() {
        return time;
    }

}

class CancelJob {
    private final String id;

    public CancelJob(String id) {
        super();
        this.id = id;
    }

    public String getId() {
        return id;
    }

}

class Worker extends AbstractActor {

    private String jobId;
    private boolean cancelling;
    private int time;
    private int i;

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(StartJob.class, job -> {
            jobId = job.getId();
            cancelling = false;
            i = 0;
            time = job.getTime();
            System.out.printf("Worker %s starts with id=%s\n", self(), 
job.getId());
            self().tell("continue", self());
        }).matchEquals("continue", p -> {
            if (cancelling) {
                System.out.printf("Worker %s cancelled.\n", jobId);
                return;
            }
            System.out.printf("Worker %s progress %.0f%%\n", jobId, i * 
100.0 / time);
            Thread.sleep(1000); // simulate SQL query
            for (int j=0; j<1000000; j++); // simulate CPU intended job
            i++;
            if (i < time) {
                self().tell("continue", self());
            } else {
                System.out.printf("Worker %s finished.\n", self());
            }
        }).match(CancelJob.class, job -> job.getId().equals(jobId), job -> {
            System.out.printf("Worker %s cancelling.\n", job.getId());
            cancelling = true;
        }).build();
    }

}

class Master extends AbstractActor {

    private final ActorRef workerRouter;

    public Master(int numWorkers) {
        workerRouter = 
this.getContext().actorOf(Props.create(Worker.class).withRouter(new 
RoundRobinPool(numWorkers)),
                "workerRouter");
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(StartJob.class, job -> {
            System.out.printf("Submitting job %s\n", job.getId());
            workerRouter.tell(job, self());
        }).match(CancelJob.class, job -> {
            System.out.printf("Cancelling job %s\n", job.getId());
            getContext().actorSelection("workerRouter/*").tell(job, self());
        }).build();
    }
}

public class AnalyticsApp {
    public static void main(String[] args) throws IOException, 
InterruptedException {
        ActorSystem system = ActorSystem.create("JobManager");

        ActorRef masterRef = system.actorOf(Props.create(Master.class, 2), 
"job-manager");
        System.out.println("Master: " + masterRef);

        for (int i = 0; i < 4; i++) {
            masterRef.tell(new StartJob(Integer.toString(i), 5), 
ActorRef.noSender());
        }

        Thread.sleep(1000);
        masterRef.tell(new CancelJob("1"), ActorRef.noSender());

        System.out.println(">>> Press ENTER to exit <<<");
        try {
            System.in.read();
        } finally {
            system.terminate();
        }

    }
}

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to