I am using the cluster singleton, my mistake. I was somehow believing the 
leader always had the singleton... 

Anyway, it might be that https://github.com/akka/akka/issues/17479 is 
related. I am not downing any node manually, however, and a node will never 
down itself, right? Anyway, this bug gave me some pointers, and I will 
investigate further.

I avoid split-brain by calling System.exit on any system that has less than 
half the cluster size in it's member list.

Anders

onsdag 24. juni 2015 17.04.36 UTC+2 skrev Patrik Nordwall følgende:
>
>
>
> On Wed, Jun 24, 2015 at 4:57 PM, Anders Båtstrand <[email protected] 
> <javascript:>> wrote:
>
> No OutOfMemory, the third node is running fine. Except is can be the 
> leader, and in that case I have two leaders...
>
>
> What are you using the leader for? There is no guarantee that there will 
> not be more than one leader.
> For that you have to use the cluster singleton, or cluster sharding.
>  
>
>
> I think I have reproduced it in the following program (let me know if you 
> want the complete maven setup or similar):
>
>
> Thanks, I will try that.
>  
>
>
> application.conf:
>
> akka {
>   actor.provider = "akka.cluster.ClusterActorRefProvider"
>   remote.netty.tcp.hostname = "127.0.0.1"
>
>   cluster {
>     seed-nodes = [
>       "akka.tcp://[email protected]:2551",
>       "akka.tcp://[email protected]:2552"
>     ]
>
>     auto-down-unreachable-after = 10ms
>
>     failure-detector {
>       heartbeat-interval = 10 ms
>       threshold = 2.0
>       acceptable-heartbeat-pause = 10 ms
>       expected-response-after = 5 ms
>     }
>   }
> }
>
>
> And the test:
>
> package no.kantega.workshop.akka;
>
> import akka.actor.ActorRef;
> import akka.actor.ActorSystem;
> import akka.actor.Address;
> import akka.cluster.Cluster;
> import akka.cluster.Member;
> import akka.contrib.pattern.DistributedPubSubExtension;
> import akka.contrib.pattern.DistributedPubSubMediator;
> import com.typesafe.config.ConfigFactory;
> import org.junit.Test;
> import scala.collection.Iterator;
> import scala.collection.immutable.SortedSet;
>
> import java.util.HashSet;
> import java.util.Set;
> import java.util.concurrent.TimeUnit;
>
> import static com.jayway.awaitility.Awaitility.await;
> import static com.typesafe.config.ConfigValueFactory.fromAnyRef;
> import static org.assertj.core.api.Assertions.assertThat;
>
> public class ClusterConvergenceTest {
>
>     @Test
>     public void test_many_times() throws InterruptedException {
>         for (int i = 0; i < 100; i++) {
>             cluster_membership_is_symmetric();
>         }
>     }
>
>     @Test
>     public void cluster_membership_is_symmetric() throws InterruptedException 
> {
>
>         ActorSystem actorSystem1 = ActorSystem.apply("system", 
> ConfigFactory.load()
>                 .withValue("akka.remote.netty.tcp.port", fromAnyRef("2551")));
>
>         ActorSystem actorSystem2 = ActorSystem.apply("system", 
> ConfigFactory.load()
>                 .withValue("akka.remote.netty.tcp.port", fromAnyRef("2552")));
>
>         ActorSystem actorSystem3 = ActorSystem.apply("system", 
> ConfigFactory.load()
>                 .withValue("akka.remote.netty.tcp.port", fromAnyRef("2553")));
>
>         try {
>
>             Cluster cluster1 = Cluster.get(actorSystem1);
>             Cluster cluster2 = Cluster.get(actorSystem2);
>             Cluster cluster3 = Cluster.get(actorSystem3);
>
>             // Wait until all members can see all the others:
>             await().atMost(20, TimeUnit.SECONDS).until(() -> 
> cluster1.state().members().size() == 3);
>             await().atMost(20, TimeUnit.SECONDS).until(() -> 
> cluster2.state().members().size() == 3);
>             await().atMost(20, TimeUnit.SECONDS).until(() -> 
> cluster3.state().members().size() == 3);
>
>             System.out.println("Generate some load (we should see cluster 
> events in the console log)...");
>
>             for (ActorSystem system : new ActorSystem[]{actorSystem1, 
> actorSystem2, actorSystem3}) {
>                 ActorRef mediator = 
> DistributedPubSubExtension.get(system).mediator();
>                 for (int i = 0; i < 1_000_000; i++) {
>                     String event = "Message number  " + i;
>                     mediator.tell(new 
> DistributedPubSubMediator.Publish("dummy stream", event), 
> ActorRef.noSender());
>                 }
>             }
>
>             System.out.println("Wait for things to settle down (cluster 
> events should stop)...");
>
>             // Ideally I would have a way to know when all messages was 
> processed...
>             Thread.sleep(30_000L);
>
>             System.out.println("Check that cluster membership is 
> reflexise...");
>
>             membershipIsSymmetric(cluster1, cluster2);
>             membershipIsSymmetric(cluster2, cluster3);
>             membershipIsSymmetric(cluster1, cluster3);
>
>         } finally {
>             actorSystem1.shutdown();
>             actorSystem2.shutdown();
>             actorSystem3.shutdown();
>             actorSystem1.awaitTermination();
>             actorSystem2.awaitTermination();
>             actorSystem3.awaitTermination();
>         }
>     }
>
>     /**
>      * Here we check that cluster1 has cluster2 as a member iff cluster2 has 
> cluster1 as a member.
>      */
>     private static void membershipIsSymmetric(Cluster cluster1, Cluster 
> cluster2) {
>         if 
> (addresses(cluster1.state().members()).contains(cluster2.selfAddress())) {
>             // node 1 sees node 2, check the opposite way
>             
> assertThat(addresses(cluster2.state().members())).contains(cluster1.selfAddress());
>         } else {
>             // node 1 does not see node 2, check the opposite way
>             
> assertThat(addresses(cluster2.state().members())).doesNotContain(cluster1.selfAddress());
>         }
>     }
>
>     private static Set<Address> addresses(SortedSet<Member> members) {
>         Set<Address
>
> ...

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to