I'm using Cluster aware routers that I want to create on the fly when 
needed and I have problems initializing them properly. Right now my 
workaround is to set allow-local-routees to false and to do a manual sleep 
of half a second before sending the first message but there must be some 
better way of doing this.

I have a few problems:
1. if allow-local-routees is set to true, then local routees are added even 
though the role doesn't match what's set in the use-role property.
2. As in above, depending on how and when you start the router it might 
have the local routee or not. If you start the router before the worker 
node joins it will have the local routee before it joins but not after. If 
you start the router after the worker node joins it will always contain the 
local routee. So in either way this is inconsistent.
3. If I start the router when the worker node is already up I can't send a 
message immediately since it won't contain any routees yet. I don't see any 
way of detecting when it has gotten the routees from the current cluster 
state and I'm currently doing half a second sleep to wait for this.

How should I properly make sure the cluster aware routers get initialized? 
How is the allow-local-routees supposed to work, should it still check the 
use-role?

I'm attaching a test class that I can reproduce my case with. I'm using 
Akka 2.3.0.

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
akka {
        actor {
                provider = "akka.cluster.ClusterActorRefProvider"
    }
    
    remote {
        log-remote-lifecycle-events = off
        netty.tcp {
                hostname = "127.0.0.1"
                port = 0
        }
    }
     
    cluster {
        auto-down-unreachable-after = 10s
        roles=[gatewayio]
    }
}
import java.util.Collections;

import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.routing.ClusterRouterGroup;
import akka.cluster.routing.ClusterRouterGroupSettings;
import akka.dispatch.OnFailure;
import akka.dispatch.OnSuccess;
import akka.io.Tcp;
import akka.pattern.Patterns;
import akka.routing.ActorSelectionRoutee;
import akka.routing.FromConfig;
import akka.routing.GetRoutees;
import akka.routing.RoundRobinGroup;
import akka.routing.Routee;
import akka.routing.Routees;

import com.sveng.reactor.log.ReactorLog;
import com.sveng.reactor.plugin.ActorEnvironment;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigList;


public class ClusterAwareRouterTest {
	
	public static ActorSystem actorSystem;
	
	public static ActorSystem actorSystem2;

	public static void main(String[] args) throws InterruptedException {
		//start the main actor system
		Config cfg = ConfigFactory.load("io_actor_system.conf");
		int port = 2500;
		cfg = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).withFallback(cfg);
		cfg = ConfigFactory.parseString("akka.cluster.seed-nodes=[\"akka.tcp://[email protected]:" + port + "\"]").withFallback(cfg);
		cfg = ConfigFactory.parseString("akka.cluster.roles=[myotherrole]").withFallback(cfg);
		actorSystem = ActorSystem.create("test", cfg);
		
		//create a cluster aware router
		ActorRef cluster_aware_router = createRouter("/user/test", true);
		
		//this will apparently add a local routee on the cluster aware router
		//NOTE: This is unexpected since the cluster role doesn't match
		printRoutees(cluster_aware_router);
		
		Thread.sleep(1000);
		
		//create the "worker" actor system
		Config cfg2 = ConfigFactory.load("io_actor_system.conf");
		int port2 = 2501;
		cfg2 = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port2).withFallback(cfg2);
		cfg2 = ConfigFactory.parseString("akka.cluster.seed-nodes=[\"akka.tcp://[email protected]:" + port + "\"]").withFallback(cfg2);
		cfg2 = ConfigFactory.parseString("akka.cluster.roles=[myrole]").withFallback(cfg2);
		actorSystem2 = ActorSystem.create("test", cfg2);
		
		//allow time to initialize and let the second actor system connect
		Thread.sleep(5000);
		
		//now the cluster aware router will correctly only contain the worker node
		//NOTE: This is inconsistent since it earlier contained a local routee
		printRoutees(cluster_aware_router);
		
		//Now create another cluster aware router
		ActorRef cluster_aware_router2 = createRouter("/user/test2", true);
		
		//this will contain a local routee again
		printRoutees(cluster_aware_router2);
		
		//allow time to initialize
		Thread.sleep(1000);
		
		//now it will STILL contain the local routee in addition to the worker node
		//this is just wrong
		printRoutees(cluster_aware_router2);
		
		//Doesn't matter if we let it initialize first
		//will yield same unexpected result
		ActorRef cluster_aware_router3 = createRouter("/user/test2", true);
		Thread.sleep(5000);
		printRoutees(cluster_aware_router3);
		
		Thread.sleep(5000);
		System.exit(0);
	}
	
	private static void printRoutees(ActorRef ref) {
		Future<Object> f = Patterns.ask(ref, GetRoutees.getInstance(), 5000);
		f.onSuccess(new OnSuccess<Object>() {
			public final void onSuccess(Object o) {
				Routees rs = (Routees)o;
				System.out.println("Routees " + rs.getRoutees().size());
				for(Routee r : rs.getRoutees()) {
					ActorSelectionRoutee asr = (ActorSelectionRoutee)r;
					System.out.println("Routee: " + r + " " + asr.selection() + " " + asr.selection().anchorPath() + " " + asr.selection().anchorPath().address());
				}
			}
		}, actorSystem.dispatcher());
		f.onFailure((new OnFailure() {
			@Override
			public void onFailure(Throwable t) throws Throwable {
				t.printStackTrace();
			}
		}), actorSystem.dispatcher());
	}
	
	private static ActorRef createRouter(String path, boolean local) {
		int total_instances = 100;
		String service_path = path;
		Iterable<String> paths = Collections.singletonList(service_path);
		
		String role = "myrole";
		ActorRef router = actorSystem.actorOf(new ClusterRouterGroup(new RoundRobinGroup(paths),
				new ClusterRouterGroupSettings(total_instances, paths, local, role)).props());
		
		return router;
	}

}

Reply via email to