Hi Patrik,

Yeah you're probably right. I can't really see anything wrong with the 
wrapping. Here are the complete exception details:

08:41:51.223 [PipelineNode-akka.actor.default-dispatcher-18] ERROR 
akka.actor.OneForOneStrategy - IO error: lock 
/home/lmedina/projects/pipeline/journal/LOCK: Resource temporarily 
unavailable
akka.actor.ActorInitializationException: exception during creation
at akka.actor.ActorInitializationException$.apply(Actor.scala:164) 
~[akka-actor_2.11-2.3.3.jar:na]
at akka.actor.ActorCell.create(ActorCell.scala:596) 
~[akka-actor_2.11-2.3.3.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) 
~[akka-actor_2.11-2.3.3.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) 
~[akka-actor_2.11-2.3.3.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) 
~[akka-actor_2.11-2.3.3.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
~[akka-actor_2.11-2.3.3.jar:na]
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
~[na:1.8.0_05]
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
~[na:1.8.0_05]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_05]
Caused by: org.fusesource.leveldbjni.internal.NativeDB$DBException: IO 
error: lock /home/lmedina/projects/pipeline/journal/LOCK: Resource 
temporarily unavailable
at 
org.fusesource.leveldbjni.internal.NativeDB.checkStatus(NativeDB.java:200) 
~[leveldbjni-1.7.jar:1.7]
at org.fusesource.leveldbjni.internal.NativeDB.open(NativeDB.java:218) 
~[leveldbjni-1.7.jar:1.7]
at org.fusesource.leveldbjni.JniDBFactory.open(JniDBFactory.java:168) 
~[leveldbjni-1.7.jar:1.7]
at 
akka.persistence.journal.leveldb.LeveldbStore$class.preStart(LeveldbStore.scala:114)
 
~[akka-persistence-experimental_2.11-2.3.3.jar:na]
at 
akka.persistence.journal.leveldb.LeveldbJournal.preStart(LeveldbJournal.scala:20)
 
~[akka-persistence-experimental_2.11-2.3.3.jar:na]
at akka.actor.Actor$class.aroundPreStart(Actor.scala:470) 
~[akka-actor_2.11-2.3.3.jar:na]
at 
akka.persistence.journal.leveldb.LeveldbJournal.aroundPreStart(LeveldbJournal.scala:20)
 
~[akka-persistence-experimental_2.11-2.3.3.jar:na]
at akka.actor.ActorCell.create(ActorCell.scala:580) 
~[akka-actor_2.11-2.3.3.jar:na]
... 7 common frames omitted
08:41:51.226 [PipelineNode-akka.actor.default-dispatcher-18] INFO 
 akka.actor.RepointableActorRef - Message 
[akka.persistence.JournalProtocol$ReplayMessages] from 
Actor[akka://PipelineNode/user/sharding/Streams/1#-1640969238] to 
Actor[akka://PipelineNode/system/journal#101608488] was not delivered. [6] 
dead letters encountered. This logging can be turned off or adjusted with 
configuration settings 'akka.log-dead-letters' and 
'akka.log-dead-letters-during-shutdown'.

To be even more descriptive, here are the exact steps that leads to this:

1. I start up my Node class which is what creates an ActorSystem:

ActorSystem system = ActorSystem.create("PipelineNode", configuration);

a singleton WatchDog (and a proxy to communicate with it):

system.actorOf(ClusterSingletonManager.defaultProps(WatchDogActor.props(), 
"WatchDogSingleton", PoisonPill.getInstance(), ""), "WatchDog");

ActorRef watchdogProxy = 
system.actorOf(ClusterSingletonProxy.defaultProps("/user/WatchDog/WatchDogSingleton",
 
""), "WatchDogProxy");

and finally my sharded StreamSupervisor:

ClusterSharding.get(system).start("Streams",
                                  StreamSupervisor.props(),
                                  new MessageExtractor());

In this case, my WatchDog is what is going to be managing the streams being 
started up by the StreamSupervisor.

2. Next, I create a Stream class which is what essentially defines a stream 
and I pass it to my WatchDog through its proxy as a persistent object so 
that it can persist this information (in this case wrapping the stream in a 
Persistent object and sending it along gives me no issues):

Stream stream = new Stream("Streams", 1L, StreamActor.props(config, new 
TwitterStreamManagerBuilder()));
watchdogProxy.tell(Persistent.create(stream), ActorRef.noSender());

Among the information included in the Stream object are its name 
("Streams") which will be used to identify the correct ShardRegion and its 
id value (1L) which is what the MessageExtractor will use for its entryId and 
to generate its shardId (id % x).

3. Now my WatchDog singleton will receive the Stream information and it 
will create a new instance of the stream by sending it a new ShardMessage:

ActorRef region = 
ClusterSharding.get(getContext().system()).shardRegion(stream.getName());
region.tell(new ShardMessage(Persistent.create(stream.getProps()), 
stream.getId()), getSelf());

the WatchDog will keep sending the props object to the StreamSupervisor until 
it receives an acknowledgement from it.

4. Inside the StreamSupervisor, it should be receiving the 
Persistent-wrapped Props object and instantiate an actor with it except 
that it never gets this far:

public class StreamSupervisor extends UntypedProcessor {
    private Props props;

    private ActorRef stream;

    public static Props props() {
        return Props.create(StreamSupervisor.class);
    }

    @Override
    public void onReceive(Object message) {
        if (message instanceof Persistent) {
            onMessage((Persistent) message);
        }
    }

    public void onMessage(Persistent message) {
        Object payload = message.payload();

        if (payload instanceof Props) {
            props = (Props) payload;

            initializeStream();

            getContext().become(initialized);
        } 
    }

    private void initializeStream() {
        stream = getContext().actorOf(props);
        getContext().watch(stream);
    }

    private final Behavior initialized = new Behavior() {
        @Override
        public void apply(Object message) {
            if (message instanceof Terminated) {
                Terminated terminated = (Terminated) message;

                if (terminated.getActor() == stream) {
                    getContext().unwatch(stream);
                    initializeStream();
                }
            }
        }
    };
}

That is pretty much it. Also, I don't know if it's of any significance here 
but when I start up my cluster, I set the min-nr-of-members = 3.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to