Hi Patrik,
Yeah you're probably right. I can't really see anything wrong with the
wrapping. Here are the complete exception details:
08:41:51.223 [PipelineNode-akka.actor.default-dispatcher-18] ERROR
akka.actor.OneForOneStrategy - IO error: lock
/home/lmedina/projects/pipeline/journal/LOCK: Resource temporarily
unavailable
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:164)
~[akka-actor_2.11-2.3.3.jar:na]
at akka.actor.ActorCell.create(ActorCell.scala:596)
~[akka-actor_2.11-2.3.3.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456)
~[akka-actor_2.11-2.3.3.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478)
~[akka-actor_2.11-2.3.3.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263)
~[akka-actor_2.11-2.3.3.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
~[akka-actor_2.11-2.3.3.jar:na]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
~[na:1.8.0_05]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
~[na:1.8.0_05]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_05]
Caused by: org.fusesource.leveldbjni.internal.NativeDB$DBException: IO
error: lock /home/lmedina/projects/pipeline/journal/LOCK: Resource
temporarily unavailable
at
org.fusesource.leveldbjni.internal.NativeDB.checkStatus(NativeDB.java:200)
~[leveldbjni-1.7.jar:1.7]
at org.fusesource.leveldbjni.internal.NativeDB.open(NativeDB.java:218)
~[leveldbjni-1.7.jar:1.7]
at org.fusesource.leveldbjni.JniDBFactory.open(JniDBFactory.java:168)
~[leveldbjni-1.7.jar:1.7]
at
akka.persistence.journal.leveldb.LeveldbStore$class.preStart(LeveldbStore.scala:114)
~[akka-persistence-experimental_2.11-2.3.3.jar:na]
at
akka.persistence.journal.leveldb.LeveldbJournal.preStart(LeveldbJournal.scala:20)
~[akka-persistence-experimental_2.11-2.3.3.jar:na]
at akka.actor.Actor$class.aroundPreStart(Actor.scala:470)
~[akka-actor_2.11-2.3.3.jar:na]
at
akka.persistence.journal.leveldb.LeveldbJournal.aroundPreStart(LeveldbJournal.scala:20)
~[akka-persistence-experimental_2.11-2.3.3.jar:na]
at akka.actor.ActorCell.create(ActorCell.scala:580)
~[akka-actor_2.11-2.3.3.jar:na]
... 7 common frames omitted
08:41:51.226 [PipelineNode-akka.actor.default-dispatcher-18] INFO
akka.actor.RepointableActorRef - Message
[akka.persistence.JournalProtocol$ReplayMessages] from
Actor[akka://PipelineNode/user/sharding/Streams/1#-1640969238] to
Actor[akka://PipelineNode/system/journal#101608488] was not delivered. [6]
dead letters encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
To be even more descriptive, here are the exact steps that leads to this:
1. I start up my Node class which is what creates an ActorSystem:
ActorSystem system = ActorSystem.create("PipelineNode", configuration);
a singleton WatchDog (and a proxy to communicate with it):
system.actorOf(ClusterSingletonManager.defaultProps(WatchDogActor.props(),
"WatchDogSingleton", PoisonPill.getInstance(), ""), "WatchDog");
ActorRef watchdogProxy =
system.actorOf(ClusterSingletonProxy.defaultProps("/user/WatchDog/WatchDogSingleton",
""), "WatchDogProxy");
and finally my sharded StreamSupervisor:
ClusterSharding.get(system).start("Streams",
StreamSupervisor.props(),
new MessageExtractor());
In this case, my WatchDog is what is going to be managing the streams being
started up by the StreamSupervisor.
2. Next, I create a Stream class which is what essentially defines a stream
and I pass it to my WatchDog through its proxy as a persistent object so
that it can persist this information (in this case wrapping the stream in a
Persistent object and sending it along gives me no issues):
Stream stream = new Stream("Streams", 1L, StreamActor.props(config, new
TwitterStreamManagerBuilder()));
watchdogProxy.tell(Persistent.create(stream), ActorRef.noSender());
Among the information included in the Stream object are its name
("Streams") which will be used to identify the correct ShardRegion and its
id value (1L) which is what the MessageExtractor will use for its entryId and
to generate its shardId (id % x).
3. Now my WatchDog singleton will receive the Stream information and it
will create a new instance of the stream by sending it a new ShardMessage:
ActorRef region =
ClusterSharding.get(getContext().system()).shardRegion(stream.getName());
region.tell(new ShardMessage(Persistent.create(stream.getProps()),
stream.getId()), getSelf());
the WatchDog will keep sending the props object to the StreamSupervisor until
it receives an acknowledgement from it.
4. Inside the StreamSupervisor, it should be receiving the
Persistent-wrapped Props object and instantiate an actor with it except
that it never gets this far:
public class StreamSupervisor extends UntypedProcessor {
private Props props;
private ActorRef stream;
public static Props props() {
return Props.create(StreamSupervisor.class);
}
@Override
public void onReceive(Object message) {
if (message instanceof Persistent) {
onMessage((Persistent) message);
}
}
public void onMessage(Persistent message) {
Object payload = message.payload();
if (payload instanceof Props) {
props = (Props) payload;
initializeStream();
getContext().become(initialized);
}
}
private void initializeStream() {
stream = getContext().actorOf(props);
getContext().watch(stream);
}
private final Behavior initialized = new Behavior() {
@Override
public void apply(Object message) {
if (message instanceof Terminated) {
Terminated terminated = (Terminated) message;
if (terminated.getActor() == stream) {
getContext().unwatch(stream);
initializeStream();
}
}
}
};
}
That is pretty much it. Also, I don't know if it's of any significance here
but when I start up my cluster, I set the min-nr-of-members = 3.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.