No OutOfMemory, the third node is running fine. Except is can be the 
leader, and in that case I have two leaders...

I think I have reproduced it in the following program (let me know if you 
want the complete maven setup or similar):

application.conf:

akka {
  actor.provider = "akka.cluster.ClusterActorRefProvider"
  remote.netty.tcp.hostname = "127.0.0.1"

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551",
      "akka.tcp://[email protected]:2552"
    ]

    auto-down-unreachable-after = 10ms

    failure-detector {
      heartbeat-interval = 10 ms
      threshold = 2.0
      acceptable-heartbeat-pause = 10 ms
      expected-response-after = 5 ms
    }
  }
}


And the test:

package no.kantega.workshop.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.cluster.Cluster;
import akka.cluster.Member;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import com.typesafe.config.ConfigFactory;
import org.junit.Test;
import scala.collection.Iterator;
import scala.collection.immutable.SortedSet;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.jayway.awaitility.Awaitility.await;
import static com.typesafe.config.ConfigValueFactory.fromAnyRef;
import static org.assertj.core.api.Assertions.assertThat;

public class ClusterConvergenceTest {

    @Test
    public void test_many_times() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            cluster_membership_is_symmetric();
        }
    }

    @Test
    public void cluster_membership_is_symmetric() throws InterruptedException {

        ActorSystem actorSystem1 = ActorSystem.apply("system", 
ConfigFactory.load()
                .withValue("akka.remote.netty.tcp.port", fromAnyRef("2551")));

        ActorSystem actorSystem2 = ActorSystem.apply("system", 
ConfigFactory.load()
                .withValue("akka.remote.netty.tcp.port", fromAnyRef("2552")));

        ActorSystem actorSystem3 = ActorSystem.apply("system", 
ConfigFactory.load()
                .withValue("akka.remote.netty.tcp.port", fromAnyRef("2553")));

        try {

            Cluster cluster1 = Cluster.get(actorSystem1);
            Cluster cluster2 = Cluster.get(actorSystem2);
            Cluster cluster3 = Cluster.get(actorSystem3);

            // Wait until all members can see all the others:
            await().atMost(20, TimeUnit.SECONDS).until(() -> 
cluster1.state().members().size() == 3);
            await().atMost(20, TimeUnit.SECONDS).until(() -> 
cluster2.state().members().size() == 3);
            await().atMost(20, TimeUnit.SECONDS).until(() -> 
cluster3.state().members().size() == 3);

            System.out.println("Generate some load (we should see cluster 
events in the console log)...");

            for (ActorSystem system : new ActorSystem[]{actorSystem1, 
actorSystem2, actorSystem3}) {
                ActorRef mediator = 
DistributedPubSubExtension.get(system).mediator();
                for (int i = 0; i < 1_000_000; i++) {
                    String event = "Message number  " + i;
                    mediator.tell(new DistributedPubSubMediator.Publish("dummy 
stream", event), ActorRef.noSender());
                }
            }

            System.out.println("Wait for things to settle down (cluster events 
should stop)...");

            // Ideally I would have a way to know when all messages was 
processed...
            Thread.sleep(30_000L);

            System.out.println("Check that cluster membership is reflexise...");

            membershipIsSymmetric(cluster1, cluster2);
            membershipIsSymmetric(cluster2, cluster3);
            membershipIsSymmetric(cluster1, cluster3);

        } finally {
            actorSystem1.shutdown();
            actorSystem2.shutdown();
            actorSystem3.shutdown();
            actorSystem1.awaitTermination();
            actorSystem2.awaitTermination();
            actorSystem3.awaitTermination();
        }
    }

    /**
     * Here we check that cluster1 has cluster2 as a member iff cluster2 has 
cluster1 as a member.
     */
    private static void membershipIsSymmetric(Cluster cluster1, Cluster 
cluster2) {
        if 
(addresses(cluster1.state().members()).contains(cluster2.selfAddress())) {
            // node 1 sees node 2, check the opposite way
            
assertThat(addresses(cluster2.state().members())).contains(cluster1.selfAddress());
        } else {
            // node 1 does not see node 2, check the opposite way
            
assertThat(addresses(cluster2.state().members())).doesNotContain(cluster1.selfAddress());
        }
    }

    private static Set<Address> addresses(SortedSet<Member> members) {
        Set<Address> set = new HashSet<>();
        Iterator<Member> iterator = members.iterator();
        while (iterator.hasNext()) {
            set.add(iterator.next().address());
        }
        return set;
    }
}


You might have to change some values on your system in case it is faster 
than mine (or slower, but I doubt that).

Anders
>
>  

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to