Each server is subscribed to cluster events and each event is received by 
the supervisor of that server; when I say server think a micro-service and 
when I say supervisor think a supervisor for the category of that 
micro-service like Account supervisor or Order supervisor, etc, each server 
event message is dealt with accordingly, other things happen of course, if 
a member comes up to the cluster it notifies other members saying, here is 
my ActorRef supervisor, so to that actor ref other node send their caches, 
the beauty of it is that actorref can be use as a Map key, so if a node 
goes down, I locate the key and remove the children keys, partial 
production and working Java 8 code:

import akka.actor.*;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.MemberEvent;
import akka.cluster.ClusterEvent.MemberUp;
import akka.cluster.Member;
import akka.dispatch.Futures;
import akka.dispatch.OnComplete;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.pattern.Patterns;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.Router;
-- removed company imports;
import com.zaxxer.hikari.HikariDataSource;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Future;

import javax.persistence.EntityManagerFactory;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.ConcurrentMap;

import static akka.actor.ActorRef.noSender;
import static akka.cluster.ClusterEvent.initialStateAsEvents;
-- removed company imports;

/**
 * @author Guido Medina, created on 17/03/15.
 */
public abstract class SupervisorAbstractActor<
  K extends Serializable,
  T extends GenericEntity<K, T>,
  S extends SupervisorAbstractActor<K, T, S, C>,
  C extends ChildAbstractActor<K, T, S, C>> extends DefaultAbstractActor {

  public static final String ENGINE_DISPATCHER = "engine.dispatcher";
  public static final String DEFAULT_DISPATCHER = 
"akka.actor.default-dispatcher";

  public final SupervisorKey key;
  public final HikariDataSource ds;
  public final EntityManagerFactory emf;
  public final LoggingAdapter log;

  final ConcurrentMap<SupervisorType, Router> routers = newConcurrentMap();
  final ComputingConcurrentMap<SupervisorType, 
ComputingConcurrentMap<SupervisorKey, ConcurrentMap<Integer, ActorRef>>>
    actors = newComputingConcurrentMap(supervisorType -> 
newComputingConcurrentMap(supervisorKey -> newConcurrentMap()));

  public SupervisorAbstractActor(SupervisorType type) {
    final ActorSystem system = context().system();
    log = Logging.getLogger(system, this);
    key = new SupervisorKey(type, self());
    try {
      ds = createDataSource(system.settings().config());
      emf = createEntityManagerFactory(ds);
    } catch (Exception e) {
      log.error("Error starting supervisor " + key, e);
      system.terminate();
      throw e;
    }
    Cluster.get(system).subscribe(key.supervisorRef, 
initialStateAsEvents(), MemberEvent.class);
  }

  protected abstract ChildActorCreator<K, T, S, C> creator(T entity);

  protected ActorRef createChildActor(ActorContext context, T entity) {
    return 
context.actorOf(Props.create(creator(entity)).withDispatcher(DEFAULT_DISPATCHER));
  }

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return ALWAYS_RESUME_STRATEGY;
  }

  @Override
  public void postStop() {
    emf.close();
    ds.close();
  }

  private void createSupervisor(SupervisorCommand<ConcurrentMap<Integer, 
ActorRef>> command) {
    actors.computeIfAbsent(command.key.type).compute(command.key, 
(supervisor, children) -> {
      if (children == null) {
        if (command.key.type.distributionType.isRoundRobin) {
          routers.compute(command.key.type, (type, router) ->
            (router == null ? new Router(new RoundRobinRoutingLogic()) : 
router).addRoutee(command.key.supervisorRef));
        }
        return command.value != null ? command.value : newConcurrentMap();
      } else {
        if (command.value != null) {
          children.putAll(command.value);
        }
        return children;
      }
    });
  }

  private void registerKey(Member member) {
    final Set<String> roles = member.getRoles();
    if (!roles.isEmpty()) {
      final String role = roles.iterator().next();
      if (SupervisorType.findBy(role) != key.type) {
        context().actorSelection(member.address() + "/user/" + role).
          tell(SupervisorCommand.registerKey(), key.supervisorRef);
      }
    }
  }

  private void registerKey(SupervisorCommand<SupervisorKey> 
supervisorCommand) {
    
createSupervisor(SupervisorCommand.createSupervisor(supervisorCommand.key, 
null));
    final ConcurrentMap<Integer, ActorRef> cache = findCacheBy(key);
    supervisorCommand.key.supervisorRef.
      tell(SupervisorCommand.createSupervisor(key, cache.isEmpty() ? null : 
cache), noSender());
  }

  private void removeSupervisor(Member member) {
    final Set<String> roles = member.getRoles();
    if (!roles.isEmpty()) {
      final SupervisorType supervisorType = 
SupervisorType.findBy(roles.iterator().next());
      if (supervisorType != key.type) {
        final String address = member.address().toString();
        final ComputingConcurrentMap<SupervisorKey, ConcurrentMap<Integer, 
ActorRef>> cache = actors.computeIfAbsent(supervisorType);
        cache.keySet().stream().
          filter(supervisor -> 
supervisor.supervisorRef.path().address().toString().startsWith(address)).
          forEach(supervisor -> {
            final Router oldRouter = 
routers.computeIfPresent(supervisor.type,
              (type, router) -> 
router.removeRoutee(supervisor.supervisorRef));
            if (oldRouter != null && oldRouter.routees().isEmpty()) {
              routers.remove(supervisor.type);
            }
            cache.remove(supervisor);
          });
      }
    }
  }

  @SuppressWarnings("unchecked")
  private void createChild(SupervisorCommand<?> command) {
    final SupervisorCommand<ChildKey> childCommand;
    if (command.key == null) {
      childCommand = SupervisorCommand.createChild(key, (ChildKey) 
command.value);
      actors.entrySet().stream().
        filter(entry -> entry.getKey() != key.type).
        forEach(entry -> entry.getValue().keySet().stream().
          forEach(supervisor -> supervisor.supervisorRef.tell(childCommand, 
noSender())));
    } else {
      childCommand = (SupervisorCommand<ChildKey>) command;
    }
    actors.computeIfAbsent(childCommand.key.type).
      computeIfAbsent(childCommand.key).
      put(childCommand.value.id, childCommand.value.childRef);
  }

  @SuppressWarnings("unchecked")
  private void removeChild(SupervisorCommand<?> command) {
    final SupervisorCommand<ChildKey> childCommand;
    if (command.key == null) {
      childCommand = SupervisorCommand.removeChild(key, (ChildKey) 
command.value);
      actors.entrySet().stream().
        filter(entry -> entry.getKey() != key.type).
        forEach(entry -> entry.getValue().keySet().
          forEach(supervisor -> supervisor.supervisorRef.tell(childCommand, 
noSender())));
    } else {
      childCommand = (SupervisorCommand<ChildKey>) command;
    }
    actors.computeIfAbsent(childCommand.key.type).
      computeIfAbsent(childCommand.key).
      remove(childCommand.value.id);
  }

  public ConcurrentMap<Integer, ActorRef> findCacheBy(SupervisorKey 
supervisorKey) {
    return 
actors.computeIfAbsent(supervisorKey.type).computeIfAbsent(supervisorKey);
  }

  public Router findRouterBy(SupervisorType type) {
    return routers.get(type);
  }

  public Integers findActorIdsBy(SupervisorType type) {
    final Integers ids = new Integers();
    actors.computeIfAbsent(type).values().forEach(actorIds -> 
ids.addAll(actorIds.keySet()));
    return ids;
  }

  public ActorRef findActorBy(SupervisorType type, Integer id) {
    for (ConcurrentMap<Integer, ActorRef> actors : 
this.actors.computeIfAbsent(type).values()) {
      final ActorRef childRef = actors.get(id);
      if (childRef != null) {
        return childRef;
      }
    }
    return null;
  }

  public SupervisorKey findFirstSupervisorBy(SupervisorType type) {
    final ComputingConcurrentMap<SupervisorKey, ConcurrentMap<Integer, 
ActorRef>> supervisors = actors.get(type);
    return supervisors.isEmpty() ? null : 
supervisors.keySet().iterator().next();
  }

  @SuppressWarnings("unchecked")
  @Override
  public final void onReceive(Object message) {
    if (message instanceof SupervisorCommand<?>) {
      final SupervisorCommand<?> command = (SupervisorCommand<?>) message;
      switch (command.command) {
        case REGISTER_KEY:
          if (command.key == null) {
            sender().tell(SupervisorCommand.registerKey(key), noSender());
          } else {
            registerKey((SupervisorCommand<SupervisorKey>) message);
          }
          break;
        case CREATE_SUPERVISOR:
          createSupervisor((SupervisorCommand<ConcurrentMap<Integer, 
ActorRef>>) command);
          break;
        case CREATE_CHILD:
          createChild(command);
          break;
        case REMOVE_CHILD:
          removeChild(command);
          break;
        case RELOAD_CONFIG: {
          emf.getCache().evictAll();
          reloadCaches();
          final List<Future<Boolean>> futures = new ArrayList<>();
          final SupervisorCommand<Integers> configCommand = 
(SupervisorCommand<Integers>) command;
          final Integers currentActors = findActorIdsBy(key.type);
          currentActors.minus(configCommand.value).
            forEach(id -> {
              final ActorRef child = findActorBy(key.type, id);
              if (child != null) {
                futures.add(Patterns.gracefulStop(child, FIFTEEN_SECONDS));
              }
            });

          final ActorContext context = context();
          final ExecutionContextExecutor dispatcher = 
context.system().dispatcher();
          Futures.sequence(futures, dispatcher).onComplete(
            new OnComplete<Iterable<Boolean>>() {
              @Override
              public void onComplete(Throwable failure, Iterable<Boolean> 
success) {
                try (CloseableEntityManager em = new 
CloseableEntityManager(emf.createEntityManager())) {
                  key.type.findEnabledEntitiesBy(em.em, 
configCommand.value.minus(currentActors)).
                    forEach(entity -> createChildActor(context, (T) 
entity));
                }
              }
            }, dispatcher);
          break;
        }
        case RELOAD_CACHES:
          emf.getCache().evictAll();
          reloadCaches();
          break;
      }
      log.info(command.toString());
    } else if (message instanceof MemberEvent) {
      if (message instanceof MemberUp) {
        registerKey(((MemberUp) message).member());
      } else {
        removeSupervisor(((MemberEvent) message).member());
      }
    } else {
      processMessage(message);
    }
  }

  protected void reloadCaches() {
  }

  public void processMessage(Object message) {
    unhandled(message);
  }
}



*Note:* I pasted random code without context of industry because I'm not 
allowed to share the code so hope it helps and by the way you can have 
hundreds of thousands ActorRef in memory that's no problem.

On Tuesday, November 10, 2015 at 12:27:02 PM UTC, Kostas kougios wrote:
>
> Hi Guido, mine is a similar use case. One difference is that I store the 
> actorrefs to files on disk as I am expecting to have a lot of them and 
> don't want to keep them into memory. I would expect to say have 1000 
> servers with 1000 actorrefs per server for 1 "index" of mine  (index = 
> database index as I am building a database). This means it is costly to do 
> batch updates of the actor refs. So what I ended doing so far is to 
> serialize the actor path (after injecting the real server RootAddress to 
> it). Then I resolveOne actorRef and I keep that ref into memory for 10 
> seconds and then resolveOne again. This means that my servers don't have to 
> communicate up/down and it works if servers crash/restarts too. The whole 
> trick is achived by replacing ActorRef with EndpointActorRef (a class of 
> mine) which does the actorPath manipulation and takes care of serializing 
> the actorPath instead of actorRef.
>
> Overall I might end up serializing actorPath for actors in the same jvm - 
> if the actor is inactive for a long time. This means actors might come and 
> go quite frequently even if the servers are up.
>
> What happens to your cache if a server crashes? Do the cache removes the 
> invalid actor refs somehow?
>
> Cheers
>
>
> On 09/11/15 23:26, Guido Medina wrote:
>
> Hi Kostas, 
>
> I have few micro-services and some supervisor actors which inherited from 
> an AbstractSupervisor template which uses local caches per microservices 
> with an optimistic approach, example with requirements:
>
> 1) There is a uniform pattern where each micro-service is an independent 
> category and also each category has the capability of being either a 
> round-robin supervisor only or a supervisor with children (sharding)
> 2) Each category informs other categories by using cluster events when a 
> supervisor comes up and/or any/of its children.
> 3) If a supervisor dies, also its children die which means that other 
> supervisors in other categories are informed hence the cache for that 
> supervisor is invalidated.
> 4) Each children has an Integer ID, say, category account and account 1, 
> 2, ...., 10 so I don't bother giving a path to children, I know a children 
> of account is ../acccount/$blah blah incarnation, so what I do to inform 
> caches of other microservices that Account ID X = ActorRef Y
>
> All this would suggest that cluster sharding is what I use but no, it is 
> very similar but I have my own implementation constrained more to a DDD.
>
> I probably confused you more than what I helped you, all I wanted to say 
> is that caching a key with a corresponding ActorRef is not a bad idea, of 
> course if one of your microservices goes down you need to invalidate caches 
> in order to avoid actor selection calls which I would discourage you to do, 
> instead (but not free since I had to put lot of work on contemplating 
> different scenarios), do an inventory of your events and try to react on 
> them.
>
> I particularly designed the application to build the caches using cluster 
> events and template pattern (preStart and postStop for some type of actors 
> would inform other caches)
>
> Having also an uniform hierarchy will help like supervisor -> children so 
> that you can add or invalidate a whole branch (I do that too with my 
> mini-framework)
>
> Hope all these crazy ideas can help you.
>
> Best regards,
>
> Guido.
>
> On Friday, November 6, 2015 at 4:15:24 PM UTC, Kostas kougios wrote: 
>>
>> Well, I refactored my code to cache actorpaths but indeed during creation 
>> all actorpaths are local and when they are transmitted over the wire they 
>> remain local => pointing to the wrong path.
>>
>> I've manually modified the address part of the path myself so that it 
>> contains the host & port of the server. But the process seems awkward , am 
>> I missing something?
>>
>> -- 
> >>>>>>>>>> Read the docs: <http://akka.io/docs/>http://akka.io/docs/
> >>>>>>>>>> Check the FAQ: 
> <http://doc.akka.io/docs/akka/current/additional/faq.html>
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: 
> <https://groups.google.com/group/akka-user>
> https://groups.google.com/group/akka-user
> --- 
> You received this message because you are subscribed to a topic in the 
> Google Groups "Akka User List" group.
> To unsubscribe from this topic, visit 
> https://groups.google.com/d/topic/akka-user/n8HlIIkZhCs/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to 
> akka-user+...@googlegroups.com <javascript:>.
> To post to this group, send email to akka...@googlegroups.com 
> <javascript:>.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>
>
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to