Implement better self leave for akka clustering if JVM goes down.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/33319f36 Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/33319f36 Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/33319f36 Branch: refs/heads/usergrid-1318-queue Commit: 33319f36a28b08f9230d5b6daabd19762b24058a Parents: 027e40d Author: Michael Russo <[email protected]> Authored: Mon Sep 26 15:41:53 2016 -0700 Committer: Michael Russo <[email protected]> Committed: Mon Sep 26 15:41:53 2016 -0700 ---------------------------------------------------------------------- .../actorsystem/ActorSystemManager.java | 2 +- .../actorsystem/ActorSystemManagerImpl.java | 21 ++++++++++---------- .../actorsystem/ClusterListener.java | 6 +++--- .../apache/usergrid/rest/ShutdownListener.java | 11 ++-------- 4 files changed, 17 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java index 17754f0..322ac6a 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManager.java @@ -75,5 +75,5 @@ public interface ActorSystemManager { */ void publishToAllRegions( String topic, Object message, ActorRef sender ); - void shutdownAll(); + void leaveCluster(); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java index 1021b1a..cc32d1c 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ActorSystemManagerImpl.java @@ -376,13 +376,13 @@ public class ActorSystemManagerImpl implements ActorSystemManager { } } - // add a shutdown hook to clean all actor systems if the JVM exits without the servlet container knowing -// Runtime.getRuntime().addShutdownHook(new Thread() { -// @Override -// public void run() { -// shutdownAll(); -// } -// }); + //add a shutdown hook to clean all actor systems if the JVM exits without the servlet container knowing + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + leaveCluster(); + } + }); } @@ -466,10 +466,11 @@ public class ActorSystemManagerImpl implements ActorSystemManager { } @Override - public void shutdownAll(){ + public void leaveCluster(){ - logger.info("Shutting down Akka cluster: {}", clusterSystem.name()); - clusterSystem.shutdown(); + Cluster cluster = Cluster.get(clusterSystem); + logger.info("Downing self: {} from cluster: {}", cluster.selfAddress(), clusterSystem.name()); + cluster.leave(cluster.selfAddress()); } } http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java ---------------------------------------------------------------------- diff --git a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java index d0a758d..15bc372 100644 --- a/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java +++ b/stack/corepersistence/actorsystem/src/main/java/org/apache/usergrid/persistence/actorsystem/ClusterListener.java @@ -81,13 +81,13 @@ public class ClusterListener extends UntypedActor { logger.info("Unreachable member {} is accessible on the network.", event.member()); // logger.info("Unreachable member {} is accessible on the network, " + -// "application must have died. Marking member down", event.member()); +// "application must have died. Removing member ", event.member()); // -// cluster.down(event.member().address()); +// cluster.leave(event.member().address()); }else{ logger.warn("Unreachable member {} is not accessible on the network, " + - "there must be a network issue. Not marking member down", event.member()); + "there must be a network issue. Not removing member", event.member()); } http://git-wip-us.apache.org/repos/asf/usergrid/blob/33319f36/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java ---------------------------------------------------------------------- diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java b/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java index f3707a7..8c96473 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/ShutdownListener.java @@ -20,21 +20,14 @@ package org.apache.usergrid.rest; import com.google.inject.Injector; import org.apache.usergrid.batch.service.JobSchedulerService; -import org.apache.usergrid.batch.service.SchedulerService; import org.apache.usergrid.persistence.actorsystem.ActorSystemManager; -import org.apache.usergrid.persistence.cassandra.CassandraService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; import org.springframework.web.context.support.WebApplicationContextUtils; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; -import javax.servlet.http.HttpSessionAttributeListener; -import javax.servlet.http.HttpSessionEvent; -import javax.servlet.http.HttpSessionListener; -import javax.servlet.http.HttpSessionBindingEvent; import java.util.Properties; @@ -71,8 +64,8 @@ public class ShutdownListener implements ServletContextListener { Injector injector = ctx.getBean(Injector.class); ActorSystemManager actorSystemManager = injector.getInstance(ActorSystemManager.class); - // stop the Akka actor system - //actorSystemManager.shutdownAll(); + // leave akka cluster + actorSystemManager.leaveCluster(); boolean started = Boolean.parseBoolean( properties.getProperty(JobServiceBoostrap.START_SCHEDULER_PROP, "true"));
