SLIDER-782: changing how the SliderClusterProtocolService gets inited; eliminates that race condition at startup where the ports live but the service not fully inited
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/ca1e0088 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/ca1e0088 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/ca1e0088 Branch: refs/heads/develop Commit: ca1e0088a733f4ea3b44b6ec4fe7ec85b47f007c Parents: 7632f5d Author: Steve Loughran <[email protected]> Authored: Wed Feb 11 16:05:47 2015 +0000 Committer: Steve Loughran <[email protected]> Committed: Wed Feb 11 16:05:47 2015 +0000 ---------------------------------------------------------------------- .../server/appmaster/SliderAppMaster.java | 39 +++++++++----- .../rpc/SliderClusterProtocolService.java | 54 +++++++++++--------- 2 files changed, 55 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ca1e0088/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index ab67b16..8a33735 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -217,7 +217,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService public static final int NUM_RPC_HANDLERS = 5; /** - * Metrics and monitoring services + * Metrics and monitoring services. + * Deployed in {@link #serviceInit(Configuration)} */ private final MetricsAndMonitoring metricsAndMonitoring = new MetricsAndMonitoring(); /** @@ -292,6 +293,11 @@ public class SliderAppMaster extends AbstractSliderLaunchedService private final AppState appState = new AppState(new ProtobufRecordFactory(), metricsAndMonitoring); + /** + * App state for external objects. This is almost entirely + * a read-only view of the application state. To change the state, + * Providers (or anything else) are expected to queue async changes. + */ private final ProviderAppState stateForProviders = new ProviderAppState("undefined", appState); @@ -303,12 +309,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService private final Condition isAMCompleted = AMExecutionStateLock.newCondition(); /** - * Exit code for the AM to return - */ - @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") - private int amExitCode = 0; - - /** * Flag set if the AM is to be shutdown */ private final AtomicBoolean amCompletionFlag = new AtomicBoolean(false); @@ -382,8 +382,16 @@ public class SliderAppMaster extends AbstractSliderLaunchedService private SliderAMProviderService sliderAMProvider; private CertificateManager certificateManager; + /** + * Executor. + * Assigned in {@link #serviceInit(Configuration)} + */ private WorkflowExecutorService<ExecutorService> executorService; - + + /** + * Action queues. Created at instance creation, but + * added as a child and inited in {@link #serviceInit(Configuration)} + */ private final QueueService actionQueues = new QueueService(); private String agentOpsUrl; private String agentStatusUrl; @@ -397,6 +405,11 @@ public class SliderAppMaster extends AbstractSliderLaunchedService * The port for the web application */ private int webAppPort; + + /** + * Is security enabled? + * Set early on in the {@link #createAndRunCluster(String)} operation. + */ private boolean securityEnabled; /** @@ -923,9 +936,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService startAgentWebApp(appInformation, serviceConf, webAppApi); deployWebApplication(webAppPort, webAppApi); - // bind the IPC service to the API - sliderIPCService.bind(webAppApi); - // schedule YARN Registry registration queue(new ActionRegisterServiceInstance(clustername, appid)); @@ -1425,7 +1435,6 @@ public class SliderAppMaster extends AbstractSliderLaunchedService String appMessage = stopAction.getMessage(); //stop the daemon & grab its exit code int exitCode = stopAction.getExitCode(); - amExitCode = exitCode; appStatus = stopAction.getFinalApplicationStatus(); if (!spawnedProcessExitedBeforeShutdownTriggered) { @@ -1489,7 +1498,11 @@ public class SliderAppMaster extends AbstractSliderLaunchedService throws IOException, SliderException { verifyIPCAccess(); - sliderIPCService = new SliderClusterProtocolService(); + sliderIPCService = new SliderClusterProtocolService( + this, + stateForProviders, + actionQueues, + metricsAndMonitoring); deployChildService(sliderIPCService); SliderClusterProtocolPBImpl protobufRelay = http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ca1e0088/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolService.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolService.java index 613bda4..d467414 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolService.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolService.java @@ -18,6 +18,7 @@ package org.apache.slider.server.appmaster.rpc; +import com.google.common.base.Preconditions; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -30,6 +31,7 @@ import org.apache.slider.core.conf.ConfTree; import org.apache.slider.core.exceptions.ServiceNotReadyException; import org.apache.slider.core.main.LauncherExitCodes; import org.apache.slider.core.persist.ConfTreeSerDeser; +import org.apache.slider.server.appmaster.AppMasterActionOperations; import org.apache.slider.server.appmaster.actions.ActionFlexCluster; import org.apache.slider.server.appmaster.actions.ActionHalt; import org.apache.slider.server.appmaster.actions.ActionKillContainer; @@ -39,7 +41,6 @@ import org.apache.slider.server.appmaster.actions.QueueAccess; import org.apache.slider.server.appmaster.management.MetricsAndMonitoring; import org.apache.slider.server.appmaster.state.RoleInstance; import org.apache.slider.server.appmaster.state.StateAccessForProviders; -import org.apache.slider.server.appmaster.web.WebAppApi; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,8 +50,6 @@ import java.util.concurrent.TimeUnit; /** * Implement the {@link SliderClusterProtocol}. - * Important: not live until {@link #bind(WebAppApi)} - * is invoked. Until then all RPC calls will raise an exception. */ public class SliderClusterProtocolService extends AbstractService implements SliderClusterProtocol { @@ -58,28 +57,38 @@ public class SliderClusterProtocolService extends AbstractService protected static final Logger log = LoggerFactory.getLogger(SliderClusterProtocol.class); - private WebAppApi appmaster; - private QueueAccess actionQueues; - private StateAccessForProviders appState; - private MetricsAndMonitoring metricsAndMonitoring; + private final QueueAccess actionQueues; + private final StateAccessForProviders appState; + private final MetricsAndMonitoring metricsAndMonitoring; + private final AppMasterActionOperations amOperations; + + /** + * This is the prefix used for metrics + */ public static final String PROTOCOL_PREFIX = "org.apache.slider.api.SliderClusterProtocol."; - public SliderClusterProtocolService() { - super("SliderClusterProtocolService"); - } - /** - * Bind to the AM API - * @param appmaster api binding. + * Constructor + * @param amOperations access to any AM operations + * @param appState state view + * @param actionQueues queues for actions + * @param metricsAndMonitoring metrics */ - public void bind(WebAppApi appmaster) { - this.appmaster = appmaster; - actionQueues = appmaster.getQueues(); - appState = appmaster.getAppState(); - metricsAndMonitoring = appmaster.getMetricsAndMonitoring(); + public SliderClusterProtocolService(AppMasterActionOperations amOperations, + StateAccessForProviders appState, + QueueAccess actionQueues, + MetricsAndMonitoring metricsAndMonitoring) { + super("SliderClusterProtocolService"); + Preconditions.checkArgument(amOperations != null, "null amOperations"); + Preconditions.checkArgument(appState != null, "null appState"); + Preconditions.checkArgument(actionQueues != null, "null actionQueues"); + Preconditions.checkArgument(metricsAndMonitoring != null, "null metricsAndMonitoring"); + this.appState = appState; + this.actionQueues = actionQueues; + this.metricsAndMonitoring = metricsAndMonitoring; + this.amOperations = amOperations; } - @Override //SliderClusterProtocol public ProtocolSignature getProtocolSignature(String protocol, @@ -105,11 +114,6 @@ public class SliderClusterProtocolService extends AbstractService */ protected void onRpcCall(String operation) throws IOException { log.debug("Received call to {}", operation); - if (appmaster == null) { - // fail fast if the service is not ready - log.warn("Rejecting {} as service is not ready", operation); - throw new ServiceNotReadyException(ServiceNotReadyException.E_NOT_READY); - } metricsAndMonitoring.markMeterAndCounter(PROTOCOL_PREFIX + operation); } @@ -270,7 +274,7 @@ public class SliderClusterProtocolService extends AbstractService RoleInstance instance = appState.getLiveInstanceByContainerID(containerID); queue(new ActionKillContainer(instance.getId(), 0, TimeUnit.MILLISECONDS, - appmaster.getAMOperations())); + amOperations)); Messages.KillContainerResponseProto.Builder builder = Messages.KillContainerResponseProto.newBuilder(); builder.setSuccess(true);
