Repository: incubator-slider Updated Branches: refs/heads/develop 1893c7cbb -> 70b4b7592
SLIDER-467 final status == SUCCEEDED on shut down app Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/1a3e45bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/1a3e45bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/1a3e45bc Branch: refs/heads/develop Commit: 1a3e45bce82dd36a98e0228648e358f3ea6e0b49 Parents: 2051dd6 Author: Steve Loughran <[email protected]> Authored: Fri Oct 3 15:08:52 2014 -0700 Committer: Steve Loughran <[email protected]> Committed: Fri Oct 3 15:08:52 2014 -0700 ---------------------------------------------------------------------- .../org/apache/slider/client/SliderClient.java | 24 ++- .../TriggerClusterTeardownException.java | 16 +- .../apache/slider/server/appmaster/AMUtils.java | 3 +- .../server/appmaster/SliderAppMaster.java | 191 +++++++++++-------- .../appmaster/actions/ActionStopSlider.java | 98 +++++++++- .../server/appmaster/actions/QueueExecutor.java | 8 + .../server/appmaster/actions/QueueService.java | 22 ++- .../slider/server/appmaster/state/AppState.java | 8 +- .../agent/actions/TestActionStatus.groovy | 7 +- .../TestFreezeThawFlexStandaloneAM.groovy | 13 +- .../standalone/TestStandaloneAMDestroy.groovy | 17 +- .../standalone/TestStandaloneAMRestart.groovy | 29 ++- .../TestMockAppStateContainerFailure.groovy | 1 - .../slider/test/YarnMiniClusterTestBase.groovy | 24 ++- .../accumulo/live/TestAccFreezeThaw.groovy | 7 +- 15 files changed, 337 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/main/java/org/apache/slider/client/SliderClient.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java index dc81f98..d1a1a7b 100644 --- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java +++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java @@ -1545,7 +1545,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } /** - * Kill the submitted application by sending a call to the ASM + * Kill the submitted application via YARN * @throws YarnException * @throws IOException */ @@ -1875,7 +1875,6 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe LaunchedApplication application = new LaunchedApplication(yarnClient, app); applicationId = application.getApplicationId(); - if (forcekill) { //escalating to forced kill application.kill("Forced stop of " + clustername + @@ -2423,7 +2422,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe * @throws IOException * Network or other problems */ - private int actionDiagnostic(ActionDiagnosticArgs diagnosticArgs) { + public int actionDiagnostic(ActionDiagnosticArgs diagnosticArgs) { try { if (diagnosticArgs.client) { actionDiagnosticClient(); @@ -2465,7 +2464,7 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe // we are catching exceptions here because those are indication of // validation result, and we need to print them here log.error("validation of slider-client.xml fails because: " - + e.toString()); + + e.toString(), e); return; } SliderClusterOperations clusterOperations = createClusterOperations(clusterName); @@ -2489,19 +2488,24 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe } try { SliderUtils.validateHDFSFile(sliderFileSystem, imagePath); - log.info("Slider agent tarball is properly installed"); + log.info("Slider agent package is properly installed"); + } catch (FileNotFoundException e) { + log.error("can not find agent package: {}", e); + return; } catch (IOException e) { - log.error("can not find or open agent tar ball: " + e.toString()); + log.error("can not open agent package: {}", e, e); return; } String pkgTarballPath = instanceDefinition.getAppConfOperations() .getGlobalOptions().getMandatoryOption(AgentKeys.APP_DEF); try { SliderUtils.validateHDFSFile(sliderFileSystem, pkgTarballPath); - log.info("Application tarball is properly installed"); - } catch (IOException e) { - log.error("can not find or open application tar ball: " - + e.toString()); + log.info("Application package is properly installed"); + } catch (FileNotFoundException e) { + log.error("can not find application package: {}", e); + return; + } catch (IOException e) { + log.error("can not open application package: {} ", e); return; } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/main/java/org/apache/slider/core/exceptions/TriggerClusterTeardownException.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/core/exceptions/TriggerClusterTeardownException.java b/slider-core/src/main/java/org/apache/slider/core/exceptions/TriggerClusterTeardownException.java index 7f59e41..d08b33a 100644 --- a/slider-core/src/main/java/org/apache/slider/core/exceptions/TriggerClusterTeardownException.java +++ b/slider-core/src/main/java/org/apache/slider/core/exceptions/TriggerClusterTeardownException.java @@ -18,21 +18,25 @@ package org.apache.slider.core.exceptions; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; + /** * An Exception to be thrown for an explicit "shut down the cluster" operation * raised by the application state or other parts of the AM */ public class TriggerClusterTeardownException extends SliderException { + private final FinalApplicationStatus finalApplicationStatus; + public TriggerClusterTeardownException(int code, - String message, - Object... args) { + String message, + FinalApplicationStatus finalApplicationStatus, + Object... args) { super(code, message, args); + this.finalApplicationStatus = finalApplicationStatus; } - public TriggerClusterTeardownException(int code, - Throwable throwable, - String message, Object... args) { - super(code, throwable, message, args); + public FinalApplicationStatus getFinalApplicationStatus() { + return finalApplicationStatus; } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/main/java/org/apache/slider/server/appmaster/AMUtils.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/AMUtils.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/AMUtils.java index 533ee54..39f511a 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/AMUtils.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/AMUtils.java @@ -40,8 +40,7 @@ public class AMUtils { } public static boolean isMappedExitAFailure(int mappedExitCode) { - return mappedExitCode!=LauncherExitCodes.EXIT_SUCCESS - && mappedExitCode!= LauncherExitCodes.EXIT_CLIENT_INITIATED_SHUTDOWN; + return mappedExitCode != 0; } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 74a6f34..aac8106 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -285,8 +285,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * Flag set during the init process */ private final AtomicBoolean initCompleted = new AtomicBoolean(false); - - private volatile boolean success = true; /** * Flag to set if the process exit code was set before shutdown started @@ -331,7 +329,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * limit container memory */ private int containerMaxMemory; - private String amCompletionReason; + + /** + * The stop request received...the exit details are extracted + * from this + */ + private ActionStopSlider stopAction; @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private RoleLaunchService launchService; @@ -409,8 +412,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService SliderUtils.validateSliderServerEnvironment(log); executorService = new WorkflowExecutorService<ExecutorService>("AmExecutor", - Executors.newFixedThreadPool(2, - new ServiceThreadFactory("AmExecutor", true))); + Executors.newFixedThreadPool(2, + new ServiceThreadFactory("AmExecutor", true))); addService(executorService); addService(actionQueues); @@ -423,6 +426,19 @@ public class SliderAppMaster extends AbstractSliderLaunchedService super.serviceStart(); } + @Override + protected void serviceStop() throws Exception { + super.serviceStop(); + + if (fsDelegationTokenManager != null) { + try { + fsDelegationTokenManager.cancelDelegationToken(getConfig()); + } catch (Exception e) { + log.info("Error cancelling HDFS delegation token", e); + } + } + } + /** * Start the queue processing */ @@ -810,22 +826,27 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // now do the registration registerServiceInstance(clustername, appid); + // log the YARN and web UIs + log.info("RM Webapp address {}", serviceConf.get(YarnConfiguration.RM_WEBAPP_ADDRESS)); + log.info("slider Webapp address {}", appMasterTrackingUrl); + // declare the cluster initialized log.info("Application Master Initialization Completed"); initCompleted.set(true); - // start handling any scheduled events - - startQueueProcessing(); - // Start the Slider AM provider - - sliderAMProvider.start(); - - // launch the real provider; this is expected to trigger a callback that - // starts the node review process - launchProviderService(instanceDefinition, confDir); try { + // start handling any scheduled events + + startQueueProcessing(); + // Start the Slider AM provider + + sliderAMProvider.start(); + + // launch the real provider; this is expected to trigger a callback that + // starts the node review process + launchProviderService(instanceDefinition, confDir); + //now block waiting to be told to exit the process waitForAMCompletionSignal(); //shutdown time @@ -1054,50 +1075,53 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } /** - * Declare that the AM is complete - * @param exitCode exit code for the aM - * @param reason reason for termination + * Signal that the AM is complete .. queues it in a separate thread + * + * @param stopActionRequest request containing shutdown details */ - public synchronized void signalAMComplete(int exitCode, String reason) { - amCompletionReason = reason; + public synchronized void signalAMComplete(ActionStopSlider stopActionRequest) { + // this is a queued action: schedule it through the queues + schedule(stopActionRequest); + } + /** + * Signal that the AM is complete + * + * @param stopActionRequest request containing shutdown details + */ + public synchronized void onAMStop(ActionStopSlider stopActionRequest) { + AMExecutionStateLock.lock(); try { - amCompletionFlag.set(true); - amExitCode = exitCode; - isAMCompleted.signal(); + if (amCompletionFlag.compareAndSet(false, true)) { + // first stop request received + this.stopAction = stopActionRequest; + isAMCompleted.signal(); + } } finally { AMExecutionStateLock.unlock(); } } + /** - * shut down the cluster + * trigger the YARN cluster termination process */ private synchronized void finish() { FinalApplicationStatus appStatus; - log.info("Triggering shutdown of the AM: {}", amCompletionReason); + log.info("Triggering shutdown of the AM: {}", stopAction); - String appMessage = amCompletionReason; + String appMessage = stopAction.getMessage(); //stop the daemon & grab its exit code - int exitCode = amExitCode; - success = exitCode == 0 || exitCode == 3; + int exitCode = stopAction.getExitCode(); + amExitCode = exitCode; - appStatus = success ? FinalApplicationStatus.SUCCEEDED: - FinalApplicationStatus.FAILED; + appStatus = stopAction.getFinalApplicationStatus(); if (!spawnedProcessExitedBeforeShutdownTriggered) { //stopped the forked process but don't worry about its exit code exitCode = stopForkedProcess(); log.debug("Stopped forked process: exit code={}", exitCode); } - if (fsDelegationTokenManager != null) { - try { - fsDelegationTokenManager.cancelDelegationToken(getConfig()); - } catch (Exception e) { - log.info("Error cancelling HDFS delegation token", e); - } - } - //stop any launches in progress launchService.stop(); @@ -1108,17 +1132,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService // signal to the RM log.info("Application completed. Signalling finish to RM"); - //if there were failed containers and the app isn't already down as failing, it is now -/* - int failedContainerCount = appState.getFailedCountainerCount(); - if (failedContainerCount != 0 && - appStatus == FinalApplicationStatus.SUCCEEDED) { - appStatus = FinalApplicationStatus.FAILED; - appMessage = - "Completed with exit code = " + exitCode + " - " + getContainerDiagnosticInfo(); - success = false; - } -*/ try { log.info("Unregistering AM status={} message={}", appStatus, appMessage); asyncRMClient.unregisterApplicationMaster(appStatus, appMessage, null); @@ -1134,11 +1147,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService } } - /** - * Get diagnostics info about containers - */ + /** + * Get diagnostics info about containers + */ private String getContainerDiagnosticInfo() { - return appState.getContainerDiagnosticInfo(); + + return appState.getContainerDiagnosticInfo(); } public Object getProxy(Class protocol, InetSocketAddress addr) { @@ -1342,16 +1356,12 @@ public class SliderAppMaster extends AbstractSliderLaunchedService //now apply the operations executeRMOperations(allOperations); } catch (TriggerClusterTeardownException e) { - //App state has decided that it is time to exit - log.error("Cluster teardown triggered %s", e); - signalAMComplete(e.getExitCode(), e.toString()); + log.error("Cluster teardown triggered {}", e, e); + queue(new ActionStopSlider(e)); } } - - - /** * Shutdown operation: release all containers */ @@ -1366,7 +1376,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService @Override //AMRMClientAsync public void onShutdownRequest() { LOG_YARN.info("Shutdown Request received"); - signalAMComplete(EXIT_CLIENT_INITIATED_SHUTDOWN, "Shutdown requested from RM"); + signalAMComplete(new ActionStopSlider("stop", + EXIT_CLIENT_INITIATED_SHUTDOWN, + FinalApplicationStatus.SUCCEEDED, + "Shutdown requested from RM")); } /** @@ -1392,8 +1405,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService public void onError(Throwable e) { //callback says it's time to finish LOG_YARN.error("AMRMClientAsync.onError() received " + e, e); - signalAMComplete(EXIT_EXCEPTION_THROWN, - "AMRMClientAsync.onError() received " + e); + signalAMComplete(new ActionStopSlider("stop", + EXIT_EXCEPTION_THROWN, + FinalApplicationStatus.FAILED, + "AMRMClientAsync.onError() received " + e)); } /* =================================================================== */ @@ -1440,8 +1455,14 @@ public class SliderAppMaster extends AbstractSliderLaunchedService YarnException { onRpcCall("stopCluster()"); String message = request.getMessage(); - log.info("SliderAppMasterApi.stopCluster: {}", message); - schedule(new ActionStopSlider(message, 1000, TimeUnit.MILLISECONDS)); + ActionStopSlider stopSlider = + new ActionStopSlider(message, + 1000, TimeUnit.MILLISECONDS, + 0, + FinalApplicationStatus.SUCCEEDED, + message); + log.info("SliderAppMasterApi.stopCluster: {}", stopSlider); + schedule(stopSlider); return Messages.StopClusterResponseProto.getDefaultInstance(); } @@ -1657,9 +1678,11 @@ public class SliderAppMaster extends AbstractSliderLaunchedService flexCluster(getInstanceDefinition().getResources()); } catch (Exception e) { //this may happen in a separate thread, so the ability to act is limited - log.error("Failed to flex cluster nodes", e); + log.error("Failed to flex cluster nodes: {}", e, e); //declare a failure - finish(); + queue(new ActionStopSlider("stop", + EXIT_DEPLOYMENT_FAILED, FinalApplicationStatus.FAILED, + "Failed to create application:" + e.toString())); } } @@ -1698,12 +1721,19 @@ public class SliderAppMaster extends AbstractSliderLaunchedService if (service == providerService && service.isInState(STATE.STOPPED)) { //its the current master process in play int exitCode = providerService.getExitCode(); - int mappedProcessExitCode = - AMUtils.mapProcessExitCodeToYarnExitCode(exitCode); + int mappedProcessExitCode = exitCode; + boolean shouldTriggerFailure = !amCompletionFlag.get() - && (AMUtils.isMappedExitAFailure(mappedProcessExitCode)); - + && (mappedProcessExitCode != 0); + if (shouldTriggerFailure) { + String reason = + "Spawned master exited with raw " + exitCode + " mapped to " + + mappedProcessExitCode; + ActionStopSlider stop = new ActionStopSlider("stop", + mappedProcessExitCode, + FinalApplicationStatus.FAILED, + reason); //this wasn't expected: the process finished early spawnedProcessExitedBeforeShutdownTriggered = true; log.info( @@ -1712,9 +1742,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService mappedProcessExitCode); //tell the AM the cluster is complete - signalAMComplete(mappedProcessExitCode, - "Spawned master exited with raw " + exitCode + " mapped to " + - mappedProcessExitCode); + signalAMComplete(stop); } else { //we don't care log.info( @@ -1860,11 +1888,20 @@ public class SliderAppMaster extends AbstractSliderLaunchedService */ public void onExceptionInThread(Thread thread, Exception exception) { log.error("Exception in {}: {}", thread.getName(), exception, exception); - int exitCode = EXIT_EXCEPTION_THROWN; - if (exception instanceof ExitCodeProvider) { - exitCode = ((ExitCodeProvider) exception).getExitCode(); + + // if there is a teardown in progress, ignore it + if (amCompletionFlag.get()) { + log.info("Ignoring exception: shutdown in progress"); + } else { + int exitCode = EXIT_EXCEPTION_THROWN; + if (exception instanceof ExitCodeProvider) { + exitCode = ((ExitCodeProvider) exception).getExitCode(); + } + signalAMComplete(new ActionStopSlider("stop", + exitCode, + FinalApplicationStatus.FAILED, + exception.toString())); } - signalAMComplete(exitCode, exception.toString()); } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java index 64b8e9e..39ff761 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/ActionStopSlider.java @@ -18,29 +18,115 @@ package org.apache.slider.server.appmaster.actions; -import org.apache.slider.core.main.LauncherExitCodes; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.slider.core.exceptions.TriggerClusterTeardownException; import org.apache.slider.server.appmaster.SliderAppMaster; import org.apache.slider.server.appmaster.state.AppState; import java.util.concurrent.TimeUnit; +/** + * Trigger an AM exit. This is used to build the exit status message for YARN + */ public class ActionStopSlider extends AsyncAction { + private int exitCode; + private FinalApplicationStatus finalApplicationStatus; + private String message; + + /** + * Simple constructor + * @param name action name + */ + public ActionStopSlider(String name) { + super(name); + } + + + /** + * Stop slider + * @param name action name + * @param delay execution delay + * @param timeUnit delay time unit + * @param exitCode process exit code + * @param finalApplicationStatus yarn status + * @param message message for AM + */ public ActionStopSlider(String name, long delay, - TimeUnit timeUnit) { + TimeUnit timeUnit, + int exitCode, + FinalApplicationStatus finalApplicationStatus, + String message) { super(name, delay, timeUnit, ATTR_HALTS_APP); + this.exitCode = exitCode; + this.finalApplicationStatus = finalApplicationStatus; + this.message = message; + } + + /** + * Stop slider + * @param name action name + * @param exitCode process exit code + * @param finalApplicationStatus yarn status + * @param message message for AM + */ + public ActionStopSlider(String name, + int exitCode, + FinalApplicationStatus finalApplicationStatus, String message) { + super(name); + this.exitCode = exitCode; + this.finalApplicationStatus = finalApplicationStatus; + this.message = message; } + /** + * Simple constructor + * @param name action name + */ + public ActionStopSlider(TriggerClusterTeardownException ex) { + this("stop", + ex.getExitCode(), + ex.getFinalApplicationStatus(), + ex.getMessage()); + } + @Override public void execute(SliderAppMaster appMaster, QueueAccess queueService, AppState appState) throws Exception { - String message = name; SliderAppMaster.getLog().info("SliderAppMasterApi.stopCluster: {}", message); - appMaster.signalAMComplete( - LauncherExitCodes.EXIT_CLIENT_INITIATED_SHUTDOWN, - message); + appMaster.onAMStop(this); + } + + @Override + public String toString() { + return String.format("%s: exit code = %d, %s: %s;", + name, exitCode, finalApplicationStatus, message) ; + } + + public int getExitCode() { + return exitCode; + } + + public void setExitCode(int exitCode) { + this.exitCode = exitCode; + } + + public FinalApplicationStatus getFinalApplicationStatus() { + return finalApplicationStatus; + } + + public void setFinalApplicationStatus(FinalApplicationStatus finalApplicationStatus) { + this.finalApplicationStatus = finalApplicationStatus; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java index a40b0f3..1bc933f 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueExecutor.java @@ -25,6 +25,8 @@ import org.apache.slider.server.appmaster.state.AppState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicBoolean; + /** * Executor for async actions - hands them off to the AM as * appropriate @@ -37,6 +39,7 @@ public class QueueExecutor implements Runnable { private final QueueService actionQueues; private final AppState appState; + public QueueExecutor(SliderAppMaster appMaster, QueueService actionQueues) { Preconditions.checkNotNull(appMaster); @@ -72,11 +75,16 @@ public class QueueExecutor implements Runnable { } while (!(take instanceof ActionStopQueue)); log.info("Queue Executor run() stopped"); + } catch (InterruptedException e) { + // interrupted: exit } catch (Exception e) { log.error("Exception processing {}: {}", take, e, e); if (appMaster != null) { appMaster.onExceptionInThread(Thread.currentThread(), e); } } + // tag completed + actionQueues.complete(); } + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java index 5b24a35..146dea4 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/actions/QueueService.java @@ -32,6 +32,7 @@ import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * The Queue service provides immediate and scheduled queues, as well @@ -51,6 +52,7 @@ implements Runnable, QueueAccess { private static final Logger log = LoggerFactory.getLogger(QueueService.class); public static final String NAME = "Action Queue"; + private final AtomicBoolean completed = new AtomicBoolean(false); /** * Immediate actions. @@ -176,7 +178,25 @@ implements Runnable, QueueAccess { } while (!(take instanceof ActionStopQueue)); log.info("QueueService processor terminated"); } catch (InterruptedException e) { - //game over + // interrupted during actions } + // the thread exits, but does not tag the service as complete. That's expected + // to be done by the stop queue + } + + + /** + * Check to see if the queue executor has completed + * @return the status + */ + public boolean isCompleted() { + return completed.get(); + } + + /** + * Package scoped method to mark the queue service as finished + */ + void complete() { + completed.set(true); } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 02a69f5..63032b0 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; @@ -1536,16 +1537,17 @@ public class AppState { throws TriggerClusterTeardownException { int failures = role.getFailed(); int threshold = getFailureThresholdForRole(role); - log.debug("Failure count of role: {}: {}, threshold={}", + log.debug("Failure count of component: {}: {}, threshold={}", role.getName(), failures, threshold); if (failures > threshold) { throw new TriggerClusterTeardownException( SliderExitCodes.EXIT_DEPLOYMENT_FAILED, ErrorStrings.E_UNSTABLE_CLUSTER + - " - failed with role %s failing %d times (%d in startup);" + + " - failed with component %s failing %d times (%d in startup);" + " threshold is %d - last failure: %s", - role.getName(), + FinalApplicationStatus.FAILED, + role.getName(), role.getFailed(), role.getStartFailed(), threshold, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/test/groovy/org/apache/slider/agent/actions/TestActionStatus.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/actions/TestActionStatus.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/actions/TestActionStatus.groovy index bae8cea..9fcdb17 100644 --- a/slider-core/src/test/groovy/org/apache/slider/agent/actions/TestActionStatus.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/agent/actions/TestActionStatus.groovy @@ -20,6 +20,7 @@ package org.apache.slider.agent.actions import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.slider.agent.AgentMiniClusterTestBase import org.apache.slider.common.SliderExitCodes import org.apache.slider.api.ClusterDescription @@ -136,12 +137,13 @@ class TestActionStatus extends AgentMiniClusterTestBase { ] ) assert statusLauncher.serviceExitCode == 0 - tfile = new File(path) ClusterDescription cd2 = new ClusterDescription(); cd2.fromJson(text) clusterActionFreeze(sliderClient, clustername, "stopping first cluster") - waitForAppToFinish(sliderClient) + def finishedAppReport = waitForAppToFinish(sliderClient) + assert finishedAppReport.finalApplicationStatus == + FinalApplicationStatus.SUCCEEDED //now expect the status to fail try { @@ -151,6 +153,7 @@ class TestActionStatus extends AgentMiniClusterTestBase { assertExceptionDetails(e, SliderExitCodes.EXIT_BAD_STATE, ErrorStrings.E_APPLICATION_NOT_RUNNING) } + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/test/groovy/org/apache/slider/agent/freezethaw/TestFreezeThawFlexStandaloneAM.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/freezethaw/TestFreezeThawFlexStandaloneAM.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/freezethaw/TestFreezeThawFlexStandaloneAM.groovy index 7708502..c496fa9 100644 --- a/slider-core/src/test/groovy/org/apache/slider/agent/freezethaw/TestFreezeThawFlexStandaloneAM.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/agent/freezethaw/TestFreezeThawFlexStandaloneAM.groovy @@ -22,6 +22,7 @@ import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import org.apache.hadoop.fs.FileSystem as HadoopFS import org.apache.hadoop.fs.Path +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.slider.agent.AgentMiniClusterTestBase import org.apache.slider.client.SliderClient @@ -66,7 +67,8 @@ class TestFreezeThawFlexStandaloneAM extends AgentMiniClusterTestBase { addToTeardown(sliderClient); assert 0 == clusterActionFreeze(sliderClient, clustername) - + def report = sliderClient.applicationReport + assert report.finalApplicationStatus == FinalApplicationStatus.SUCCEEDED // here we do something devious: delete our copy of the configuration // this makes sure the remote config gets picked up @@ -83,11 +85,14 @@ class TestFreezeThawFlexStandaloneAM extends AgentMiniClusterTestBase { // while running, flex it with no changes newCluster.flex(clustername, [:]); - //stop - assert 0 == clusterActionFreeze(sliderClient, clustername) + // force freeze now + + assert 0 == clusterActionFreeze(newCluster, clustername, "forced", true) + report = newCluster.applicationReport + assert report.finalApplicationStatus == FinalApplicationStatus.KILLED //stop again - assert 0 == clusterActionFreeze(sliderClient, clustername) + assert 0 == clusterActionFreeze(newCluster, clustername) } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMDestroy.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMDestroy.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMDestroy.groovy index aec4930..fa48b70 100644 --- a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMDestroy.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMDestroy.groovy @@ -20,6 +20,7 @@ package org.apache.slider.agent.standalone import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.slider.agent.AgentMiniClusterTestBase import org.apache.slider.client.SliderClient import org.apache.slider.common.SliderExitCodes @@ -42,8 +43,8 @@ import org.junit.Test class TestStandaloneAMDestroy extends AgentMiniClusterTestBase { @Test - public void testDestroyStandaloneAM() throws Throwable { - String clustername = createMiniCluster("", configuration, 1, false) + public void testStandaloneAMDestroy() throws Throwable { + String clustername = createMiniCluster("", configuration, 1, true) describe "create a Standalone AM, stop it, try to create" + "a second cluster with the same name, destroy it, try a third time" @@ -74,16 +75,17 @@ class TestStandaloneAMDestroy extends AgentMiniClusterTestBase { instanceDir) sliderFileSystem.locateInstanceDefinition(clustername) - clusterActionFreeze(sliderClient, clustername,"stopping first cluster") - waitForAppToFinish(sliderClient) + clusterActionFreeze(sliderClient, clustername, "stopping first cluster") + def finishedAppReport = waitForAppToFinish(sliderClient) + assert finishedAppReport.finalApplicationStatus == FinalApplicationStatus.SUCCEEDED describe "Warnings below are expected" //now try to create instance #2, and expect an in-use failure try { - createStandaloneAM(clustername, false, false) - fail("expected a failure, got an AM") + SliderClient am = createStandaloneAM(clustername, false, false).service + fail("expected a failure, got an AM: $am") } catch (SliderException e) { assertExceptionDetails(e, SliderExitCodes.EXIT_INSTANCE_EXISTS, @@ -102,6 +104,9 @@ class TestStandaloneAMDestroy extends AgentMiniClusterTestBase { describe "post destroy checks" sliderFileSystem.verifyDirectoryNonexistent(instanceDir) + // look up app report and verify exit code is good + + describe "start expected to fail" //expect start to now fail def ex = launchExpectingException(SliderClient, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy index 8d9318a..1073309 100644 --- a/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/agent/standalone/TestStandaloneAMRestart.groovy @@ -21,18 +21,19 @@ package org.apache.slider.agent.standalone import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.ApplicationReport +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.slider.agent.AgentMiniClusterTestBase import org.apache.slider.client.SliderClient import org.apache.slider.common.SliderXmlConfKeys import org.apache.slider.common.params.ActionAMSuicideArgs +import org.apache.slider.common.params.ActionDiagnosticArgs import org.apache.slider.common.params.Arguments import org.apache.slider.core.main.ServiceLauncher import org.junit.Test /** - * kill a masterless AM and verify it shuts down. This test - * also sets the retry count to 1 to stop recreation attempts + * kill an AM and verify it is restarted */ @CompileStatic @Slf4j @@ -41,7 +42,7 @@ class TestStandaloneAMRestart extends AgentMiniClusterTestBase { @Test - public void testRestartStandaloneAM() throws Throwable { + public void testStandaloneAMRestart() throws Throwable { describe "kill a Standalone AM and verify that it restarts" // patch the configuration for AM restart YarnConfiguration conf = getRestartableConfiguration(5) @@ -62,6 +63,11 @@ class TestStandaloneAMRestart extends AgentMiniClusterTestBase { waitUntilClusterLive(sliderClient, 30000) + def diagnosticArgs = new ActionDiagnosticArgs() + diagnosticArgs.client = true + diagnosticArgs.yarn = true + sliderClient.actionDiagnostic(diagnosticArgs) + ActionAMSuicideArgs args = new ActionAMSuicideArgs() args.message = "test AM iteration" args.waittime = 100 @@ -70,14 +76,23 @@ class TestStandaloneAMRestart extends AgentMiniClusterTestBase { waitWhileClusterLive(sliderClient); //give yarn some time to notice sleep(20000) - waitUntilClusterLive(sliderClient, 40000) + waitUntilClusterLive(sliderClient, 20000) // app should be running here assert 0 == sliderClient.actionExists(clustername, true) - - - clusterActionFreeze(sliderClient, clustername) + // kill again & expect it to be considered a failure + sliderClient.actionAmSuicide(clustername, args) + report = sliderClient.applicationReport + assert report.finalApplicationStatus == FinalApplicationStatus.FAILED + + logReport(report) + describe("Kill worked, freezing again") + assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true) + assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true) + assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true) + assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true) + assert 0 == clusterActionFreeze(sliderClient, clustername, "force", true) } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateContainerFailure.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateContainerFailure.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateContainerFailure.groovy index 068b876..9902155 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateContainerFailure.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/appstate/TestMockAppStateContainerFailure.groovy @@ -23,7 +23,6 @@ import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.ContainerId import org.apache.slider.api.ResourceKeys import org.apache.slider.core.conf.AggregateConf -import org.apache.slider.core.conf.MapOperations import org.apache.slider.core.exceptions.SliderException import org.apache.slider.core.exceptions.TriggerClusterTeardownException import org.apache.slider.server.appmaster.actions.ResetFailureWindow http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy b/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy index 97cc853..aa82bdb 100644 --- a/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/test/YarnMiniClusterTestBase.groovy @@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FileUtil import org.apache.hadoop.fs.Path import org.apache.hadoop.hdfs.MiniDFSCluster import org.apache.hadoop.service.ServiceOperations -import org.apache.hadoop.util.Shell import org.apache.hadoop.yarn.api.records.ApplicationReport import org.apache.hadoop.yarn.api.records.YarnApplicationState import org.apache.hadoop.yarn.conf.YarnConfiguration @@ -651,12 +650,18 @@ public abstract class YarnMiniClusterTestBase extends ServiceLauncherBaseTest { return resourceConfDir.absoluteFile.toURI().toString() } - + /** + * Log an application report + * @param report + */ public void logReport(ApplicationReport report) { log.info(SliderUtils.reportToString(report)); } - + /** + * Log a list of application reports + * @param apps + */ public void logApplications(List<ApplicationReport> apps) { apps.each { ApplicationReport r -> logReport(r) } } @@ -674,6 +679,7 @@ public abstract class YarnMiniClusterTestBase extends ServiceLauncherBaseTest { * force kill the application after waiting for * it to shut down cleanly * @param client client to talk to + * @return the final application report */ public ApplicationReport waitForAppToFinish(SliderClient client) { @@ -681,6 +687,13 @@ public abstract class YarnMiniClusterTestBase extends ServiceLauncherBaseTest { return waitForAppToFinish(client, waitTime) } + /** + * force kill the application after waiting for + * it to shut down cleanly + * @param client client to talk to + * @param waitTime time in milliseconds to wait + * @return the final application report + */ public static ApplicationReport waitForAppToFinish( SliderClient client, int waitTime) { @@ -697,6 +710,7 @@ public abstract class YarnMiniClusterTestBase extends ServiceLauncherBaseTest { } nodes.each { ClusterNode node -> log.info(node.toString())} client.forceKillApplication("timed out waiting for application to complete"); + report = client.applicationReport } return report; } @@ -709,11 +723,13 @@ public abstract class YarnMiniClusterTestBase extends ServiceLauncherBaseTest { * @return the exit code */ public int clusterActionFreeze(SliderClient sliderClient, String clustername, - String message = "action stop") { + String message = "action stop", + boolean force = false) { log.info("Stopping cluster $clustername: $message") ActionFreezeArgs freezeArgs = new ActionFreezeArgs(); freezeArgs.waittime = CLUSTER_STOP_TIME freezeArgs.message = message + freezeArgs.force = force int exitCode = sliderClient.actionFreeze(clustername, freezeArgs); if (exitCode != 0) { http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/1a3e45bc/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccFreezeThaw.groovy ---------------------------------------------------------------------- diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccFreezeThaw.groovy b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccFreezeThaw.groovy index 108bab2..3d0c03f 100644 --- a/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccFreezeThaw.groovy +++ b/slider-providers/accumulo/slider-accumulo-provider/src/test/groovy/org/apache/slider/providers/accumulo/live/TestAccFreezeThaw.groovy @@ -20,6 +20,7 @@ package org.apache.slider.providers.accumulo.live import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus import org.apache.slider.core.main.ServiceLauncher import org.apache.slider.providers.accumulo.AccumuloConfigFileOptions import org.apache.slider.providers.accumulo.AccumuloKeys @@ -70,8 +71,10 @@ class TestAccFreezeThaw extends AccumuloTestBase { log.info("Stopping") clusterActionFreeze(sliderClient, clustername, "stop"); - waitForAppToFinish(sliderClient) - + def finishedAppReport = waitForAppToFinish(sliderClient) + assert finishedAppReport.finalApplicationStatus == + FinalApplicationStatus.SUCCEEDED + //make sure the fetch fails try { page = fetchLocalPage(AccumuloConfigFileOptions.MONITOR_PORT_CLIENT_INT,
