Oh! Yep that was it, I didn't know sender() doesn't work outside of the
receive! Thanks!
On Tuesday, June 17, 2014 3:01:28 PM UTC-4, Thomas Lockney wrote:
>
> Here’s your problem:
>
> static class DataWorker extends AbstractActor {
>
> ...
>
> private final ActorRef sender = sender();
>
>
> Sender is only relevant once you’re inside the receive block where the
> message is being acted upon. With your current code, it’s guaranteed sender
> will always give you the deadletter mailbox, as it’s a final reference to a
> non-existent sender. Move that statement to just inside the receive block
> right after you match on the message, creating a final reference to it
> there and you should be good to go.
>
>
>
> On Jun 17, 2014, at 11:12 AM, Chanan Braunstein <[email protected]
> <javascript:>> wrote:
>
> I figured the same, so I did the following:
>
> private final ActorRef sender = sender();
> private final ActorRef self = self();
> private final ActorContext context = context();
> private final ActorRef parent = context.parent();
>
> This did not help though. That is also why I logged the self, sender and
> parent as the first line in the constructor of the DataWorker class below
> prior to any async extra (other than usual akka)
>
> Let me try to paste part of the class here and just remove the parts that
> don't matter. Hope that is enough, yet not too big to be annoying:
>
> public class GridActor2 extends AbstractActorWithStash {
> private Optional<GridFullHeader> gridFullHeader = Optional.empty();
> private final GridService gridService;
>
> private final Cluster cluster =
> Cluster.builder().addContactPoint("localhost").build();
> private final Session session = cluster.connect("gradebook");
>
> public GridActor2(final String name, final GridService gridService) {
> this.gridService = gridService;
> Logger.debug("start grid actor");
> receive(
> ReceiveBuilder.
> matchAny(obj -> {
> if (!gridFullHeader.isPresent()) {
> stash();
> final ActorRef actor =
> context().actorOf(Props.create(GridActor2.Worker.class, session, null,
> this.gridService), "init");
> actor.tell(new GridFullHeaderRequest(name), self());
> context().become(
> ReceiveBuilder.
> match(GridFullHeaderResponse.class, response
> -> {
>
> Logger.debug("GridFullHeaderResponse!!!!!");
> gridFullHeader =
> Optional.of(response.header);
> unstashAll();
> context().unbecome();
> }).matchAny(o -> stash())
> .build()
> );
> } else {
> final ActorRef worker =
> context().actorOf(Props.create(GridActor2.Worker.class, session,
> gridFullHeader.get(), this.gridService));
> worker.forward(obj, context());
> }
> }).
> build()
> );
> }
>
> @Override
> public void postStop() {
> session.closeAsync();
> }
>
> static class Worker extends AbstractActor {
> private final GridFullHeader header;
> private final GridService gridService;
>
> //For gridHeader
> private Optional<List<Student>> students = Optional.empty();
> private Optional<List<Assignment>> assignments = Optional.empty();
> private Optional<UpdateTimeHolder> updateTimeHolder =
> Optional.empty();
>
> public Worker(Session session, GridFullHeader gridFullHeader,
> GridService gridService) {
> this.header = gridFullHeader;
> this.gridService = gridService;
>
> Logger.debug("Start worker");
>
> receive(
> ReceiveBuilder.
> match(GridFullHeaderRequest.class, request -> {
> Logger.debug("GridFullHeaderRequest");
> final ActorRef worker1 =
> context().actorOf(Props.create(DataWorker.class, session), "w1");
> final ActorRef worker2 =
> context().actorOf(Props.create(DataWorker.class, session), "w2");
> final ActorRef worker3 =
> context().actorOf(Props.create(DataWorker.class, session), "w3");
>
> worker1.tell(new SelectStudents(request.courseId),
> self());
> worker2.tell(new
> SelectAssignments(request.courseId), self());
> worker3.tell(new UpdateTimes(request.courseId),
> self());
>
> context().become(
> ReceiveBuilder.
> match(SelectStudentsResponse.class,
> response -> {
> students =
> Optional.of(response.students);
> if(students.isPresent() &&
> assignments.isPresent() && updateTimeHolder.isPresent()) {
> GridFullHeader header = new
> GridFullHeader(response.courseId, students.get(), assignments.get(),
> updateTimeHolder.get());
> context().parent().tell(new
> GridFullHeaderResponse(response.courseId, header), self());
> }
> }).
> match(SelectAssignmentsResponse.class,
> response -> {
> assignments =
> Optional.of(response.assignments);
> if(students.isPresent() &&
> assignments.isPresent() && updateTimeHolder.isPresent()) {
> GridFullHeader header = new
> GridFullHeader(response.courseId, students.get(), assignments.get(),
> updateTimeHolder.get());
> context().parent().tell(new
> GridFullHeaderResponse(response.courseId, header), self());
> }
> }).
> match(UpdateTimesResponse.class, response
> -> {
> updateTimeHolder =
> Optional.of(response.updateTimeHolder);
> if(students.isPresent() &&
> assignments.isPresent() && updateTimeHolder.isPresent()) {
> GridFullHeader header = new
> GridFullHeader(response.courseId, students.get(), assignments.get(),
> updateTimeHolder.get());
> context().parent().tell(new
> GridFullHeaderResponse(response.courseId, header), self());
> }
> }).
> build());
> }).build());
> }
> }
>
> static class DataWorker extends AbstractActor {
> private final BoundStatement selectStudents;
> private final BoundStatement selectAssignments;
> private final BoundStatement updateTimes;
> private final ActorRef sender = sender();
> private final ActorRef self = self();
> private final ActorContext context = context();
> private final ActorRef parent = context.parent();
>
> public DataWorker(Session session) {
> //Logging was done here
> this.selectStudents = new
> BoundStatement(session.prepare("SELECT student_id, firstname, lastname,
> average FROM gradebook.students WHERE course_id = ?;"));
> this.selectAssignments = new
> BoundStatement(session.prepare("SELECT assignment_id, name, average,
> ordernum FROM gradebook.assignments WHERE course_id = ?;"));
> this.updateTimes = new BoundStatement(session.prepare("SELECT
> last_assignment_update, last_student_update, last_grade_update FROM
> gradebook.update_times WHERE course_id = ?;"));
> receive(
> ReceiveBuilder.match(SelectStudents.class, request -> {
> Logger.debug("SelectStudents");
> selectStudents.setString("course_id",
> request.courseId);
>
> RichResultSetFuture.wrap(session.executeAsync(selectStudents)).onComplete(rs
> -> {
> final List<Student> list =
> rs.all().stream().map(r -> new Student(r.getString("firstname"),
> r.getString("lastname"), r.getString("student_id"),
> r.getString("student_id"),
> r.getDouble("average"))).collect(Collectors.toList());
> parent.tell(new
> SelectStudentsResponse(request.courseId, list), self);
> context.stop(self);
> Logger.debug(self.path().toString() + "
> sender: " + sender.path().toString());
> });
> }).match(SelectAssignments.class, request -> {
> selectAssignments.setString("course_id",
> request.courseId);
>
> RichResultSetFuture.wrap(session.executeAsync(selectAssignments)).onComplete(rs
>
> -> {
> final List<Assignment> list =
> rs.all().stream().map(r -> new Assignment(r.getString("assignment_id"),
> r.getString("name"), r.getDouble("average"))).collect(Collectors.toList());
> parent.tell(new
> SelectAssignmentsResponse(request.courseId, list), self);
> context().stop(self);
> });
> }).match(UpdateTimes.class, request -> {
> updateTimes.setString("course_id",
> request.courseId);
>
> RichResultSetFuture.wrap(session.executeAsync(updateTimes)).onComplete(rs
> -> {
> final UpdateTimeHolder updateTimeHolder =
> rs.all().stream().map(r -> new
> UpdateTimeHolder(OffsetDateTime.ofInstant(r.getDate("last_student_update").toInstant(),
>
> ZoneId.systemDefault()),
>
> OffsetDateTime.ofInstant(r.getDate("last_assignment_update").toInstant(),
> ZoneId.systemDefault()),
>
> OffsetDateTime.ofInstant(r.getDate("last_grade_update").toInstant(),
> ZoneId.systemDefault())
> )).collect(Collectors.toList()).get(0);
> parent.tell(new
> UpdateTimesResponse(request.courseId, updateTimeHolder), sender);
> context().stop(self);
> });
> }).build()
> );
> }
> }
> }
>
>
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected] <javascript:>.
> To post to this group, send email to [email protected]
> <javascript:>.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.