On Wed, Jun 24, 2015 at 4:57 PM, Anders Båtstrand <[email protected]>
wrote:

> No OutOfMemory, the third node is running fine. Except is can be the
> leader, and in that case I have two leaders...
>

What are you using the leader for? There is no guarantee that there will
not be more than one leader.
For that you have to use the cluster singleton, or cluster sharding.


>
> I think I have reproduced it in the following program (let me know if you
> want the complete maven setup or similar):
>

Thanks, I will try that.


>
> application.conf:
>
> akka {
>   actor.provider = "akka.cluster.ClusterActorRefProvider"
>   remote.netty.tcp.hostname = "127.0.0.1"
>
>   cluster {
>     seed-nodes = [
>       "akka.tcp://[email protected]:2551",
>       "akka.tcp://[email protected]:2552"
>     ]
>
>     auto-down-unreachable-after = 10ms
>
>     failure-detector {
>       heartbeat-interval = 10 ms
>       threshold = 2.0
>       acceptable-heartbeat-pause = 10 ms
>       expected-response-after = 5 ms
>     }
>   }
> }
>
>
> And the test:
>
> package no.kantega.workshop.akka;
>
> import akka.actor.ActorRef;
> import akka.actor.ActorSystem;
> import akka.actor.Address;
> import akka.cluster.Cluster;
> import akka.cluster.Member;
> import akka.contrib.pattern.DistributedPubSubExtension;
> import akka.contrib.pattern.DistributedPubSubMediator;
> import com.typesafe.config.ConfigFactory;
> import org.junit.Test;
> import scala.collection.Iterator;
> import scala.collection.immutable.SortedSet;
>
> import java.util.HashSet;
> import java.util.Set;
> import java.util.concurrent.TimeUnit;
>
> import static com.jayway.awaitility.Awaitility.await;
> import static com.typesafe.config.ConfigValueFactory.fromAnyRef;
> import static org.assertj.core.api.Assertions.assertThat;
>
> public class ClusterConvergenceTest {
>
>     @Test
>     public void test_many_times() throws InterruptedException {
>         for (int i = 0; i < 100; i++) {
>             cluster_membership_is_symmetric();
>         }
>     }
>
>     @Test
>     public void cluster_membership_is_symmetric() throws InterruptedException 
> {
>
>         ActorSystem actorSystem1 = ActorSystem.apply("system", 
> ConfigFactory.load()
>                 .withValue("akka.remote.netty.tcp.port", fromAnyRef("2551")));
>
>         ActorSystem actorSystem2 = ActorSystem.apply("system", 
> ConfigFactory.load()
>                 .withValue("akka.remote.netty.tcp.port", fromAnyRef("2552")));
>
>         ActorSystem actorSystem3 = ActorSystem.apply("system", 
> ConfigFactory.load()
>                 .withValue("akka.remote.netty.tcp.port", fromAnyRef("2553")));
>
>         try {
>
>             Cluster cluster1 = Cluster.get(actorSystem1);
>             Cluster cluster2 = Cluster.get(actorSystem2);
>             Cluster cluster3 = Cluster.get(actorSystem3);
>
>             // Wait until all members can see all the others:
>             await().atMost(20, TimeUnit.SECONDS).until(() -> 
> cluster1.state().members().size() == 3);
>             await().atMost(20, TimeUnit.SECONDS).until(() -> 
> cluster2.state().members().size() == 3);
>             await().atMost(20, TimeUnit.SECONDS).until(() -> 
> cluster3.state().members().size() == 3);
>
>             System.out.println("Generate some load (we should see cluster 
> events in the console log)...");
>
>             for (ActorSystem system : new ActorSystem[]{actorSystem1, 
> actorSystem2, actorSystem3}) {
>                 ActorRef mediator = 
> DistributedPubSubExtension.get(system).mediator();
>                 for (int i = 0; i < 1_000_000; i++) {
>                     String event = "Message number  " + i;
>                     mediator.tell(new 
> DistributedPubSubMediator.Publish("dummy stream", event), 
> ActorRef.noSender());
>                 }
>             }
>
>             System.out.println("Wait for things to settle down (cluster 
> events should stop)...");
>
>             // Ideally I would have a way to know when all messages was 
> processed...
>             Thread.sleep(30_000L);
>
>             System.out.println("Check that cluster membership is 
> reflexise...");
>
>             membershipIsSymmetric(cluster1, cluster2);
>             membershipIsSymmetric(cluster2, cluster3);
>             membershipIsSymmetric(cluster1, cluster3);
>
>         } finally {
>             actorSystem1.shutdown();
>             actorSystem2.shutdown();
>             actorSystem3.shutdown();
>             actorSystem1.awaitTermination();
>             actorSystem2.awaitTermination();
>             actorSystem3.awaitTermination();
>         }
>     }
>
>     /**
>      * Here we check that cluster1 has cluster2 as a member iff cluster2 has 
> cluster1 as a member.
>      */
>     private static void membershipIsSymmetric(Cluster cluster1, Cluster 
> cluster2) {
>         if 
> (addresses(cluster1.state().members()).contains(cluster2.selfAddress())) {
>             // node 1 sees node 2, check the opposite way
>             
> assertThat(addresses(cluster2.state().members())).contains(cluster1.selfAddress());
>         } else {
>             // node 1 does not see node 2, check the opposite way
>             
> assertThat(addresses(cluster2.state().members())).doesNotContain(cluster1.selfAddress());
>         }
>     }
>
>     private static Set<Address> addresses(SortedSet<Member> members) {
>         Set<Address> set = new HashSet<>();
>         Iterator<Member> iterator = members.iterator();
>         while (iterator.hasNext()) {
>             set.add(iterator.next().address());
>         }
>         return set;
>     }
> }
>
>
> You might have to change some values on your system in case it is faster
> than mine (or slower, but I doubt that).
>
> Anders
>>
>>   --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to