I am trying to write a very basic app where messages are sent to an actor
at a higher rate, messages are being consumer by the actor at a slower
rate, and then after some time, killing the app.
When I ran the app again with same actor system name and same actor name
with same persistenceId, I was expecting to see the missed messages
replayed, but it is not happening.
(If I delete the journal and snapshot locations, they are created again on
the next run with some files which are not 0 byte sized, so something is
happening for sure.)
public class App {
public static void main(String[] args) throws InterruptedException {
System.out.println("Hello World!");
ActorSystem actorSystem = ActorSystem.create("sample-actor-system");
ActorRef sampleActor = actorSystem.actorOf(
Props.create(AkkaWorker.class).withDispatcher(
"akka.actor.test-dispatcher"),
"sample-actor");
System.out.println(actorSystem.settings().config());
int i = 1;
//Run this coe the next time so that nothing is published, and only the
replayed messages should be executed by the actor
// Thread.sleep(10000);
// System.exit(0);
while (true) {
String msg = "Hello there" + i;
sampleActor.tell(msg, null);
System.out.println("Published message: " + msg);
i++;
// break;
Thread.sleep(100);
if (i == 20) {
Thread.sleep(10000);
System.exit(0);
}
}
}
}
public class AkkaWorker extends UntypedPersistentActor {
public AkkaWorker() {
}
@Override
public String persistenceId() {
return "sample-id-1";
}
@Override
public void onReceiveCommand(Object message) throws Exception {
System.out.println("In onReceiveCommand");
// TODO Auto-generated method stub
if (message instanceof String) {
message = (String) message;
System.out.println("Received message: " + message);
if (((String) message).equalsIgnoreCase("suicide")) {
System.out.println("killing self");
getContext().stop(getSelf());
}
Thread.sleep(1000);
}
}
@Override
public void onReceiveRecover(Object message) {
System.out.println("In onReceiveRecover");
if (message instanceof String) {
System.out.println(message);
} else {
System.out.println("God knows what: "+ message.toString());
}
}
}
In application.conf,
test-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 1
parallelism-factor = 1.0
parallelism-max = 1
}
throughput = 1
}
persistence {
journal {
max-message-batch-size = 1
leveldb {
dir = "/Users/neeraj/akka-persist/journal"
native = true
}
}
snapshot-store.local.dir = "/Users/neeraj/akka-persist/snapshot"
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.