This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit d11fdfb13106eb908ceb7e286536a52c14987526 Author: Michael Blow <[email protected]> AuthorDate: Thu Feb 13 13:12:57 2020 -0500 [NO ISSUE][CLUS] Ensure ClusterControllerService is stopped on JVM exit Change-Id: Ieb46009110c9dc98c3c476f6b153d8ac51b5c927 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/5024 Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Till Westmann <[email protected]> --- .../asterix/hyracks/bootstrap/CCApplication.java | 3 - .../asterix/hyracks/bootstrap/NCApplication.java | 7 +- .../api/common/AsterixHyracksIntegrationUtil.java | 1 + .../asterix/test/base/AsterixTestHelper.java | 1 - .../hyracks/api/service/IControllerService.java | 2 + .../org/apache/hyracks/api/util/InvokeUtil.java | 11 +-- .../hyracks/control/cc/BaseCCApplication.java | 30 ++++++-- .../control/cc/ClusterControllerService.java | 5 ++ .../control/cc/work/ClusterShutdownWork.java | 9 ++- .../control/common/ControllerShutdownHook.java} | 18 ++--- .../shutdown/IShutdownStatusConditionVariable.java | 3 +- .../control/common/shutdown/ShutdownRun.java | 32 ++++++--- .../hyracks/control/nc/BaseNCApplication.java | 26 ++++++- .../hyracks/control/nc/NodeControllerService.java | 84 ++++++++++------------ .../java/org/apache/hyracks/util/ExitUtil.java | 22 ++++++ 15 files changed, 164 insertions(+), 90 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index e2fbe35..26c092f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -85,7 +85,6 @@ import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.asterix.translator.Receptionist; import org.apache.asterix.util.MetadataBuiltinFunctions; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.config.IConfigManager; @@ -112,7 +111,6 @@ public class CCApplication extends BaseCCApplication { private static final Logger LOGGER = LogManager.getLogger(); private static IAsterixStateProxy proxy; - protected ICCServiceContext ccServiceCtx; protected CCExtensionManager ccExtensionManager; protected IStorageComponentProvider componentProvider; protected WebManager webManager; @@ -123,7 +121,6 @@ public class CCApplication extends BaseCCApplication { @Override public void init(IServiceContext serviceCtx) throws Exception { super.init(serviceCtx); - ccServiceCtx = (ICCServiceContext) serviceCtx; ccServiceCtx.setThreadFactory( new AsterixThreadFactory(ccServiceCtx.getThreadFactory(), new LifeCycleComponentManager())); validateEnvironment(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java index eba2049..2e5c09c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java @@ -72,7 +72,6 @@ import org.apache.asterix.messaging.NCMessageBroker; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.asterix.translator.Receptionist; import org.apache.asterix.util.MetadataBuiltinFunctions; -import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.client.NodeStatus; import org.apache.hyracks.api.config.IConfigManager; @@ -94,8 +93,6 @@ import org.apache.logging.log4j.Logger; public class NCApplication extends BaseNCApplication { private static final Logger LOGGER = LogManager.getLogger(); - - protected INCServiceContext ncServiceCtx; protected NCExtensionManager ncExtensionManager; private INcApplicationContext runtimeContext; private String nodeId; @@ -111,7 +108,7 @@ public class NCApplication extends BaseNCApplication { @Override public void init(IServiceContext serviceCtx) throws Exception { - ncServiceCtx = (INCServiceContext) serviceCtx; + super.init(serviceCtx); configureLoggingLevel(ncServiceCtx.getAppConfig().getLoggingLevel(ExternalProperties.Option.LOG_LEVEL)); // set the node status initially to idle to indicate that it is pending booting ((NodeControllerService) serviceCtx.getControllerService()).setNodeStatus(NodeStatus.IDLE); @@ -240,6 +237,7 @@ public class NCApplication extends BaseNCApplication { LOGGER.info("Duplicate attempt to stop ignored: " + nodeId); } } + super.stop(); } @Override @@ -249,6 +247,7 @@ public class NCApplication extends BaseNCApplication { @Override public synchronized void startupCompleted() throws Exception { + super.startupCompleted(); // configure servlets after joining the cluster, so we can create HyracksClientConnection configureServers(); webManager.start(); diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index 4837a98..df16daf 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -426,6 +426,7 @@ public class AsterixHyracksIntegrationUtil { public void stop() throws Exception { // ungraceful shutdown webManager.stop(); + super.stop(); } } diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/AsterixTestHelper.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/AsterixTestHelper.java index 3840a00..cec0de7 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/AsterixTestHelper.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/base/AsterixTestHelper.java @@ -88,7 +88,6 @@ public class AsterixTestHelper { deepSelectiveCopy(child, destChild, filter); } else if (filter.accept(child)) { FileUtil.safeCopyFile(child, destChild); - return; } } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java index 018f9fe..c92c677 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/service/IControllerService.java @@ -25,6 +25,8 @@ import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.network.INetworkSecurityManager; public interface IControllerService { + String getId(); + void start() throws Exception; void stop() throws Exception; diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java index ffd2956..5d52ed9 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@ -53,7 +53,7 @@ public class InvokeUtil { * completes, the current thread will be re-interrupted, if the original operation was interrupted. */ public static void doUninterruptibly(InterruptibleAction interruptible) { - boolean interrupted = false; + boolean interrupted = Thread.interrupted(); try { while (true) { try { @@ -75,7 +75,7 @@ public class InvokeUtil { * completes, the current thread will be re-interrupted, if the original operation was interrupted. */ public static void doExUninterruptibly(ThrowingAction interruptible) throws Exception { - boolean interrupted = false; + boolean interrupted = Thread.interrupted(); try { while (true) { try { @@ -98,7 +98,7 @@ public class InvokeUtil { * @return true if the original operation was interrupted, otherwise false */ public static boolean doUninterruptiblyGet(InterruptibleAction interruptible) { - boolean interrupted = false; + boolean interrupted = Thread.interrupted(); while (true) { try { interruptible.run(); @@ -117,7 +117,7 @@ public class InvokeUtil { * @return true if the original operation was interrupted, otherwise false */ public static boolean doExUninterruptiblyGet(Callable<Void> interruptible) throws Exception { - boolean interrupted = false; + boolean interrupted = Thread.interrupted(); boolean success = false; while (true) { try { @@ -168,7 +168,7 @@ public class InvokeUtil { * the original operation was interrupted. */ public static void doIoUninterruptibly(IOInterruptibleAction interruptible) throws IOException { - boolean interrupted = false; + boolean interrupted = Thread.interrupted(); try { while (true) { try { @@ -177,6 +177,7 @@ public class InvokeUtil { } catch (ClosedByInterruptException | InterruptedException e) { LOGGER.error("IO operation Interrupted. Retrying..", e); interrupted = true; + //noinspection ResultOfMethodCallIgnored Thread.interrupted(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java index b41d5a2..efd42c9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/BaseCCApplication.java @@ -21,6 +21,7 @@ package org.apache.hyracks.control.cc; import java.util.Arrays; import org.apache.hyracks.api.application.ICCApplication; +import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.config.Section; @@ -29,23 +30,27 @@ import org.apache.hyracks.api.job.resource.DefaultJobCapacityController; import org.apache.hyracks.api.job.resource.IJobCapacityController; import org.apache.hyracks.api.result.IJobResultCallback; import org.apache.hyracks.api.util.HyracksConstants; +import org.apache.hyracks.control.common.ControllerShutdownHook; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.controllers.ControllerConfig; import org.apache.hyracks.control.common.controllers.NCConfig; +import org.apache.hyracks.util.ExitUtil; import org.apache.hyracks.util.LoggingConfigUtil; import org.apache.logging.log4j.Level; public class BaseCCApplication implements ICCApplication { public static final ICCApplication INSTANCE = new BaseCCApplication(); + protected ICCServiceContext ccServiceCtx; private IConfigManager configManager; + private Thread shutdownHook; protected BaseCCApplication() { } @Override public void init(IServiceContext serviceCtx) throws Exception { - // no-op + ccServiceCtx = (ICCServiceContext) serviceCtx; } @Override @@ -56,13 +61,28 @@ public class BaseCCApplication implements ICCApplication { } @Override - public void stop() throws Exception { - // no-op + public void startupCompleted() throws Exception { + installShutdownHook(); } @Override - public void startupCompleted() throws Exception { - // no-op + public void stop() throws Exception { + uninstallShutdownHook(); + } + + protected Thread createShutdownHook() { + return new ControllerShutdownHook(ccServiceCtx); + } + + protected void installShutdownHook() { + shutdownHook = createShutdownHook(); + ExitUtil.registerShutdownHook(shutdownHook); + } + + protected void uninstallShutdownHook() { + if (shutdownHook != null) { + ExitUtil.unregisterShutdownHook(shutdownHook); + } } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index 1e92f16..15fab85 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -209,6 +209,11 @@ public class ClusterControllerService implements IControllerService { } @Override + public String getId() { + return "ClusterControllerService"; + } + + @Override public void start() throws Exception { LOGGER.log(Level.INFO, "Starting ClusterControllerService: " + this); serverCtx = new ServerContext(ServerContext.ServerType.CLUSTER_CONTROLLER, new File(ccConfig.getRootDir())); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java index 194d27f..9c504ed 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ClusterShutdownWork.java @@ -69,7 +69,9 @@ public class ClusterShutdownWork extends SynchronizableWork { */ nodeManager.apply(this::shutdownNode); - ccs.getExecutor().execute(() -> { + // complete the rest of the tasks in a separate standalone thread, to better allow our worker & executor + // queues to drain during shutdown + Thread finalWork = new Thread(() -> { try { /* * wait for all our acks @@ -88,8 +90,11 @@ public class ClusterShutdownWork extends SynchronizableWork { ExitUtil.exit(cleanShutdown ? EC_NORMAL_TERMINATION : EC_ABNORMAL_TERMINATION); } catch (Exception e) { callback.setException(e); + } finally { + shutdownStatus.notifyCcStopComplete(); } - }); + }, getClass().getSimpleName() + "-Helper"); + finalWork.start(); } catch (Exception e) { callback.setException(e); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCShutdownHook.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ControllerShutdownHook.java similarity index 73% rename from hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCShutdownHook.java rename to hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ControllerShutdownHook.java index ffa02d6..806e42e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NCShutdownHook.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ControllerShutdownHook.java @@ -16,26 +16,28 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.control.nc; +package org.apache.hyracks.control.common; +import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.service.IControllerService; import org.apache.hyracks.util.ThreadDumpUtil; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; /** - * Shutdown hook that invokes {@link NodeControllerService#stop() stop} method. + * Shutdown hook that invokes {@link IControllerService#stop() stop} method. * This shutdown hook must have a failsafe mechanism to halt the process in case the shutdown * operation is hanging for any reason */ -public class NCShutdownHook extends Thread { +public class ControllerShutdownHook extends Thread { private static final Logger LOGGER = LogManager.getLogger(); - private final NodeControllerService nodeControllerService; + private final IControllerService controllerService; - NCShutdownHook(NodeControllerService nodeControllerService) { - super("ShutdownHook-" + nodeControllerService.getId()); - this.nodeControllerService = nodeControllerService; + public ControllerShutdownHook(IServiceContext serviceCtx) { + super("ShutdownHook-" + serviceCtx.getControllerService().getId()); + this.controllerService = serviceCtx.getControllerService(); } @Override @@ -46,7 +48,7 @@ public class NCShutdownHook extends Thread { } catch (Throwable th) {//NOSONAR } LOGGER.log(Level.DEBUG, () -> "Thread dump at shutdown: " + ThreadDumpUtil.takeDumpString()); - nodeControllerService.stop(); + controllerService.stop(); } catch (Throwable th) { // NOSONAR... This is fine since this is shutdown hook LOGGER.log(Level.WARN, "Exception in executing shutdown hook", th); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java index 26351e2..35d6393 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/IShutdownStatusConditionVariable.java @@ -23,6 +23,5 @@ public interface IShutdownStatusConditionVariable { * @return true if all nodes ack shutdown * @throws Exception */ - public boolean waitForCompletion() throws Exception; - + boolean waitForCompletion() throws Exception; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java index e210963..0a54d1a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/shutdown/ShutdownRun.java @@ -24,11 +24,13 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import org.apache.hyracks.util.Span; + public class ShutdownRun implements IShutdownStatusConditionVariable { private final Set<String> shutdownNodeIds = new TreeSet<>(); - private boolean shutdownSuccess = false; - private static final long SHUTDOWN_TIMER_MS = TimeUnit.SECONDS.toMillis(30); + private boolean ccStopComplete = false; + private static final long SHUTDOWN_TIMEOUT_SECONDS = 60; public ShutdownRun(Collection<String> nodeIds) { shutdownNodeIds.addAll(nodeIds); @@ -42,26 +44,38 @@ public class ShutdownRun implements IShutdownStatusConditionVariable { public synchronized void notifyShutdown(String nodeId) { shutdownNodeIds.remove(nodeId); if (shutdownNodeIds.isEmpty()) { - shutdownSuccess = true; notifyAll(); } } @Override public synchronized boolean waitForCompletion() throws Exception { - if (shutdownNodeIds.isEmpty()) { - shutdownSuccess = true; - } else { + Span span = Span.start(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS); + while (!span.elapsed()) { + if (shutdownNodeIds.isEmpty()) { + return true; + } /* - * Either be woken up when we're done, or default to fail. + * Either be woken up when we're done, or after (remaining) timeout has elapsed */ - wait(SHUTDOWN_TIMER_MS); + span.wait(this); } - return shutdownSuccess; + return false; } public synchronized Set<String> getRemainingNodes() { return shutdownNodeIds; } + public synchronized void notifyCcStopComplete() { + ccStopComplete = true; + notifyAll(); + } + + public synchronized boolean waitForCcStopCompletion() throws Exception { + while (!ccStopComplete) { + wait(); + } + return true; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java index 882c396..b020cde 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/BaseNCApplication.java @@ -22,6 +22,7 @@ import java.lang.management.ManagementFactory; import java.util.Arrays; import org.apache.hyracks.api.application.INCApplication; +import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.config.IConfigManager; import org.apache.hyracks.api.config.Section; @@ -29,24 +30,28 @@ import org.apache.hyracks.api.control.CcId; import org.apache.hyracks.api.io.IFileDeviceResolver; import org.apache.hyracks.api.job.resource.NodeCapacity; import org.apache.hyracks.api.util.HyracksConstants; +import org.apache.hyracks.control.common.ControllerShutdownHook; import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.control.common.controllers.CCConfig; import org.apache.hyracks.control.common.controllers.ControllerConfig; import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.io.DefaultDeviceResolver; +import org.apache.hyracks.util.ExitUtil; import org.apache.hyracks.util.LoggingConfigUtil; import org.apache.logging.log4j.Level; public class BaseNCApplication implements INCApplication { public static final BaseNCApplication INSTANCE = new BaseNCApplication(); + protected INCServiceContext ncServiceCtx; private ConfigManager configManager; + private Thread shutdownHook; protected BaseNCApplication() { } @Override public void init(IServiceContext serviceCtx) throws Exception { - // no-op + ncServiceCtx = (INCServiceContext) serviceCtx; } @Override @@ -58,7 +63,7 @@ public class BaseNCApplication implements INCApplication { @Override public void startupCompleted() throws Exception { - // no-op + installShutdownHook(); } @Override @@ -68,7 +73,7 @@ public class BaseNCApplication implements INCApplication { @Override public void stop() throws Exception { - // no-op + uninstallShutdownHook(); } @Override @@ -76,6 +81,21 @@ public class BaseNCApplication implements INCApplication { // no-op } + protected Thread createShutdownHook() { + return new ControllerShutdownHook(ncServiceCtx); + } + + protected void installShutdownHook() { + shutdownHook = createShutdownHook(); + ExitUtil.registerShutdownHook(shutdownHook); + } + + protected void uninstallShutdownHook() { + if (shutdownHook != null) { + ExitUtil.unregisterShutdownHook(shutdownHook); + } + } + @Override public NodeCapacity getCapacity() { int allCores = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors(); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index d2ff66e..fec6e14 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -187,8 +187,6 @@ public class NodeControllerService implements IControllerService { ExitUtil.init(); } - private NCShutdownHook ncShutdownHook; - public NodeControllerService(NCConfig config) throws Exception { this(config, getApplication(config)); } @@ -210,9 +208,6 @@ public class NodeControllerService implements IControllerService { if (LOGGER.isInfoEnabled()) { LOGGER.info("Setting uncaught exception handler " + getLifeCycleComponentManager()); } - // Set shutdown hook before so it doesn't have the same uncaught exception handler - ncShutdownHook = new NCShutdownHook(this); - Runtime.getRuntime().addShutdownHook(ncShutdownHook); Thread.currentThread().setUncaughtExceptionHandler(getLifeCycleComponentManager()); ioManager = new IOManager(IODeviceHandle.getDevices(ncConfig.getIODevices()), application.getFileDeviceResolver(), ncConfig.getIOParallelism(), ncConfig.getIOQueueSize()); @@ -488,54 +483,47 @@ public class NodeControllerService implements IControllerService { @Override public synchronized void stop() throws Exception { - if (shutdownCallStack == null) { - shutdownCallStack = new Throwable().getStackTrace(); - LOGGER.log(Level.INFO, "Stopping NodeControllerService"); - application.preStop(); - executor.shutdownNow(); - if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { - LOGGER.log(Level.ERROR, "Some jobs failed to exit, continuing with abnormal shutdown"); - } - partitionManager.close(); - resultPartitionManager.close(); - netManager.stop(); - resultNetworkManager.stop(); - if (messagingNetManager != null) { - messagingNetManager.stop(); - } - workQueue.stop(); - application.stop(); - /* - * Stop heartbeats only after NC has stopped to avoid false node failure detection - * on CC if an NC takes a long time to stop. - */ - heartbeatManagers.values().parallelStream().forEach(HeartbeatManager::shutdown); - synchronized (ccLock) { - ccMap.values().parallelStream().forEach(cc -> { - try { - cc.getClusterControllerService().notifyShutdown(id); - } catch (Exception e) { - LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e); - } - }); - } - ipc.stop(); - ioManager.close(); - LOGGER.log(Level.INFO, "Stopped NodeControllerService"); - } else { - LOGGER.log(Level.ERROR, "Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack), + if (shutdownCallStack != null) { + LOGGER.error("Duplicate shutdown call; original: " + Arrays.toString(shutdownCallStack), new Exception("Duplicate shutdown call")); + return; } - if (ncShutdownHook != null) { - try { - Runtime.getRuntime().removeShutdownHook(ncShutdownHook); - LOGGER.info("removed shutdown hook for {}", id); - } catch (IllegalStateException e) { - LOGGER.log(Level.DEBUG, "ignoring exception while attempting to remove shutdown hook", e); - } + shutdownCallStack = new Throwable().getStackTrace(); + LOGGER.info("Stopping NodeControllerService"); + application.preStop(); + executor.shutdownNow(); + if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { + LOGGER.log(Level.ERROR, "Some jobs failed to exit, continuing with abnormal shutdown"); + } + partitionManager.close(); + resultPartitionManager.close(); + netManager.stop(); + resultNetworkManager.stop(); + if (messagingNetManager != null) { + messagingNetManager.stop(); } + workQueue.stop(); + application.stop(); + /* + * Stop heartbeats only after NC has stopped to avoid false node failure detection + * on CC if an NC takes a long time to stop. + */ + heartbeatManagers.values().parallelStream().forEach(HeartbeatManager::shutdown); + synchronized (ccLock) { + ccMap.values().parallelStream().forEach(cc -> { + try { + cc.getClusterControllerService().notifyShutdown(id); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Exception notifying CC of shutdown", e); + } + }); + } + ipc.stop(); + ioManager.close(); + LOGGER.info("Stopped NodeControllerService"); } + @Override public String getId() { return id; } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java index 680d55e..f8bc9f9 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ExitUtil.java @@ -104,6 +104,28 @@ public class ExitUtil { Runtime.getRuntime().halt(status); } + public static boolean registerShutdownHook(Thread shutdownHook) { + try { + Runtime.getRuntime().addShutdownHook(shutdownHook); + LOGGER.info("successfully registered shutdown hook {}", shutdownHook); + return true; + } catch (Exception e) { + LOGGER.warn("unable to register shutdown hook {}", shutdownHook, e); + return false; + } + } + + public static boolean unregisterShutdownHook(Thread shutdownHook) { + try { + boolean success = Runtime.getRuntime().removeShutdownHook(shutdownHook); + LOGGER.info("{}successfully removed shutdown hook {}", success ? "" : "un", shutdownHook); + return success; + } catch (IllegalStateException e) { + LOGGER.log(Level.DEBUG, "ignoring exception while attempting to remove shutdown hook", e); + return false; + } + } + private static class ShutdownWatchdog extends Thread { private final Semaphore startSemaphore = new Semaphore(0);
