No OutOfMemory, the third node is running fine. Except is can be the
leader, and in that case I have two leaders...
I think I have reproduced it in the following program (let me know if you
want the complete maven setup or similar):
application.conf:
akka {
actor.provider = "akka.cluster.ClusterActorRefProvider"
remote.netty.tcp.hostname = "127.0.0.1"
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551",
"akka.tcp://[email protected]:2552"
]
auto-down-unreachable-after = 10ms
failure-detector {
heartbeat-interval = 10 ms
threshold = 2.0
acceptable-heartbeat-pause = 10 ms
expected-response-after = 5 ms
}
}
}
And the test:
package no.kantega.workshop.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.cluster.Cluster;
import akka.cluster.Member;
import akka.contrib.pattern.DistributedPubSubExtension;
import akka.contrib.pattern.DistributedPubSubMediator;
import com.typesafe.config.ConfigFactory;
import org.junit.Test;
import scala.collection.Iterator;
import scala.collection.immutable.SortedSet;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static com.jayway.awaitility.Awaitility.await;
import static com.typesafe.config.ConfigValueFactory.fromAnyRef;
import static org.assertj.core.api.Assertions.assertThat;
public class ClusterConvergenceTest {
@Test
public void test_many_times() throws InterruptedException {
for (int i = 0; i < 100; i++) {
cluster_membership_is_symmetric();
}
}
@Test
public void cluster_membership_is_symmetric() throws InterruptedException {
ActorSystem actorSystem1 = ActorSystem.apply("system",
ConfigFactory.load()
.withValue("akka.remote.netty.tcp.port", fromAnyRef("2551")));
ActorSystem actorSystem2 = ActorSystem.apply("system",
ConfigFactory.load()
.withValue("akka.remote.netty.tcp.port", fromAnyRef("2552")));
ActorSystem actorSystem3 = ActorSystem.apply("system",
ConfigFactory.load()
.withValue("akka.remote.netty.tcp.port", fromAnyRef("2553")));
try {
Cluster cluster1 = Cluster.get(actorSystem1);
Cluster cluster2 = Cluster.get(actorSystem2);
Cluster cluster3 = Cluster.get(actorSystem3);
// Wait until all members can see all the others:
await().atMost(20, TimeUnit.SECONDS).until(() ->
cluster1.state().members().size() == 3);
await().atMost(20, TimeUnit.SECONDS).until(() ->
cluster2.state().members().size() == 3);
await().atMost(20, TimeUnit.SECONDS).until(() ->
cluster3.state().members().size() == 3);
System.out.println("Generate some load (we should see cluster
events in the console log)...");
for (ActorSystem system : new ActorSystem[]{actorSystem1,
actorSystem2, actorSystem3}) {
ActorRef mediator =
DistributedPubSubExtension.get(system).mediator();
for (int i = 0; i < 1_000_000; i++) {
String event = "Message number " + i;
mediator.tell(new DistributedPubSubMediator.Publish("dummy
stream", event), ActorRef.noSender());
}
}
System.out.println("Wait for things to settle down (cluster events
should stop)...");
// Ideally I would have a way to know when all messages was
processed...
Thread.sleep(30_000L);
System.out.println("Check that cluster membership is reflexise...");
membershipIsSymmetric(cluster1, cluster2);
membershipIsSymmetric(cluster2, cluster3);
membershipIsSymmetric(cluster1, cluster3);
} finally {
actorSystem1.shutdown();
actorSystem2.shutdown();
actorSystem3.shutdown();
actorSystem1.awaitTermination();
actorSystem2.awaitTermination();
actorSystem3.awaitTermination();
}
}
/**
* Here we check that cluster1 has cluster2 as a member iff cluster2 has
cluster1 as a member.
*/
private static void membershipIsSymmetric(Cluster cluster1, Cluster
cluster2) {
if
(addresses(cluster1.state().members()).contains(cluster2.selfAddress())) {
// node 1 sees node 2, check the opposite way
assertThat(addresses(cluster2.state().members())).contains(cluster1.selfAddress());
} else {
// node 1 does not see node 2, check the opposite way
assertThat(addresses(cluster2.state().members())).doesNotContain(cluster1.selfAddress());
}
}
private static Set<Address> addresses(SortedSet<Member> members) {
Set<Address> set = new HashSet<>();
Iterator<Member> iterator = members.iterator();
while (iterator.hasNext()) {
set.add(iterator.next().address());
}
return set;
}
}
You might have to change some values on your system in case it is faster
than mine (or slower, but I doubt that).
Anders
>
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.