I am trying to write a very basic app where messages are sent to an actor 
at a higher rate, messages are being consumer by the actor at a slower 
rate, and then after some time, killing the app.
When I ran the app again with same actor system name and same actor name 
with same persistenceId, I was expecting to see the missed messages 
replayed, but it is not happening. 

(If I delete the journal and snapshot locations, they are created again on 
the next run with some files which are not 0 byte sized, so something is 
happening for sure.)

public class App {
    public static void main(String[] args) throws InterruptedException {
        System.out.println("Hello World!");
        ActorSystem actorSystem = ActorSystem.create("sample-actor-system");
        ActorRef sampleActor = actorSystem.actorOf(
                Props.create(AkkaWorker.class).withDispatcher(
                        "akka.actor.test-dispatcher"),
                "sample-actor");
        System.out.println(actorSystem.settings().config());
        int i = 1;
//Run this coe the next time so that nothing is published, and only the 
replayed messages should be executed by the actor
//        Thread.sleep(10000);
//        System.exit(0);
        while (true) {
            String msg = "Hello there" + i;
            sampleActor.tell(msg, null);
            System.out.println("Published message: " + msg);
            i++;
            // break;
            Thread.sleep(100);
            if (i == 20) {
                Thread.sleep(10000);
                System.exit(0);
            }
        }
    }
}


public class AkkaWorker extends UntypedPersistentActor {

    public AkkaWorker() {

    }

    @Override
    public String persistenceId() {
        return "sample-id-1";
    }

    @Override
    public void onReceiveCommand(Object message) throws Exception {
        System.out.println("In onReceiveCommand");
        // TODO Auto-generated method stub
        if (message instanceof String) {
            message = (String) message;
            System.out.println("Received message: " + message);
            if (((String) message).equalsIgnoreCase("suicide")) {
                System.out.println("killing self");
                getContext().stop(getSelf());
            }

            Thread.sleep(1000);
        }
    }

    @Override
    public void onReceiveRecover(Object message) {
        System.out.println("In onReceiveRecover");
        if (message instanceof String) {
            System.out.println(message);
        } else {
            System.out.println("God knows what: "+ message.toString());
        }
    }

}

In application.conf,

test-dispatcher {
      type = Dispatcher
      executor = "fork-join-executor"
      fork-join-executor {
        parallelism-min = 1
        parallelism-factor = 1.0
        parallelism-max = 1
      }
      throughput = 1
    }
persistence {
    journal {
      max-message-batch-size = 1
      leveldb {
        dir = "/Users/neeraj/akka-persist/journal"
        native = true
      }
    }
    snapshot-store.local.dir = "/Users/neeraj/akka-persist/snapshot"
  }

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to