I'm using Cluster aware routers that I want to create on the fly when
needed and I have problems initializing them properly. Right now my
workaround is to set allow-local-routees to false and to do a manual sleep
of half a second before sending the first message but there must be some
better way of doing this.
I have a few problems:
1. if allow-local-routees is set to true, then local routees are added even
though the role doesn't match what's set in the use-role property.
2. As in above, depending on how and when you start the router it might
have the local routee or not. If you start the router before the worker
node joins it will have the local routee before it joins but not after. If
you start the router after the worker node joins it will always contain the
local routee. So in either way this is inconsistent.
3. If I start the router when the worker node is already up I can't send a
message immediately since it won't contain any routees yet. I don't see any
way of detecting when it has gotten the routees from the current cluster
state and I'm currently doing half a second sleep to wait for this.
How should I properly make sure the cluster aware routers get initialized?
How is the allow-local-routees supposed to work, should it still check the
use-role?
I'm attaching a test class that I can reproduce my case with. I'm using
Akka 2.3.0.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
auto-down-unreachable-after = 10s
roles=[gatewayio]
}
}
import java.util.Collections;
import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.routing.ClusterRouterGroup;
import akka.cluster.routing.ClusterRouterGroupSettings;
import akka.dispatch.OnFailure;
import akka.dispatch.OnSuccess;
import akka.io.Tcp;
import akka.pattern.Patterns;
import akka.routing.ActorSelectionRoutee;
import akka.routing.FromConfig;
import akka.routing.GetRoutees;
import akka.routing.RoundRobinGroup;
import akka.routing.Routee;
import akka.routing.Routees;
import com.sveng.reactor.log.ReactorLog;
import com.sveng.reactor.plugin.ActorEnvironment;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigList;
public class ClusterAwareRouterTest {
public static ActorSystem actorSystem;
public static ActorSystem actorSystem2;
public static void main(String[] args) throws InterruptedException {
//start the main actor system
Config cfg = ConfigFactory.load("io_actor_system.conf");
int port = 2500;
cfg = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).withFallback(cfg);
cfg = ConfigFactory.parseString("akka.cluster.seed-nodes=[\"akka.tcp://[email protected]:" + port + "\"]").withFallback(cfg);
cfg = ConfigFactory.parseString("akka.cluster.roles=[myotherrole]").withFallback(cfg);
actorSystem = ActorSystem.create("test", cfg);
//create a cluster aware router
ActorRef cluster_aware_router = createRouter("/user/test", true);
//this will apparently add a local routee on the cluster aware router
//NOTE: This is unexpected since the cluster role doesn't match
printRoutees(cluster_aware_router);
Thread.sleep(1000);
//create the "worker" actor system
Config cfg2 = ConfigFactory.load("io_actor_system.conf");
int port2 = 2501;
cfg2 = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port2).withFallback(cfg2);
cfg2 = ConfigFactory.parseString("akka.cluster.seed-nodes=[\"akka.tcp://[email protected]:" + port + "\"]").withFallback(cfg2);
cfg2 = ConfigFactory.parseString("akka.cluster.roles=[myrole]").withFallback(cfg2);
actorSystem2 = ActorSystem.create("test", cfg2);
//allow time to initialize and let the second actor system connect
Thread.sleep(5000);
//now the cluster aware router will correctly only contain the worker node
//NOTE: This is inconsistent since it earlier contained a local routee
printRoutees(cluster_aware_router);
//Now create another cluster aware router
ActorRef cluster_aware_router2 = createRouter("/user/test2", true);
//this will contain a local routee again
printRoutees(cluster_aware_router2);
//allow time to initialize
Thread.sleep(1000);
//now it will STILL contain the local routee in addition to the worker node
//this is just wrong
printRoutees(cluster_aware_router2);
//Doesn't matter if we let it initialize first
//will yield same unexpected result
ActorRef cluster_aware_router3 = createRouter("/user/test2", true);
Thread.sleep(5000);
printRoutees(cluster_aware_router3);
Thread.sleep(5000);
System.exit(0);
}
private static void printRoutees(ActorRef ref) {
Future<Object> f = Patterns.ask(ref, GetRoutees.getInstance(), 5000);
f.onSuccess(new OnSuccess<Object>() {
public final void onSuccess(Object o) {
Routees rs = (Routees)o;
System.out.println("Routees " + rs.getRoutees().size());
for(Routee r : rs.getRoutees()) {
ActorSelectionRoutee asr = (ActorSelectionRoutee)r;
System.out.println("Routee: " + r + " " + asr.selection() + " " + asr.selection().anchorPath() + " " + asr.selection().anchorPath().address());
}
}
}, actorSystem.dispatcher());
f.onFailure((new OnFailure() {
@Override
public void onFailure(Throwable t) throws Throwable {
t.printStackTrace();
}
}), actorSystem.dispatcher());
}
private static ActorRef createRouter(String path, boolean local) {
int total_instances = 100;
String service_path = path;
Iterable<String> paths = Collections.singletonList(service_path);
String role = "myrole";
ActorRef router = actorSystem.actorOf(new ClusterRouterGroup(new RoundRobinGroup(paths),
new ClusterRouterGroupSettings(total_instances, paths, local, role)).props());
return router;
}
}