SLIDER-782: changing how the SliderClusterProtocolService gets inited; 
eliminates that race condition at startup where the ports live but the service 
not fully inited


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/ca1e0088
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/ca1e0088
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/ca1e0088

Branch: refs/heads/feature/SLIDER-779_Move_to_Groovy_2.4.0
Commit: ca1e0088a733f4ea3b44b6ec4fe7ec85b47f007c
Parents: 7632f5d
Author: Steve Loughran <[email protected]>
Authored: Wed Feb 11 16:05:47 2015 +0000
Committer: Steve Loughran <[email protected]>
Committed: Wed Feb 11 16:05:47 2015 +0000

----------------------------------------------------------------------
 .../server/appmaster/SliderAppMaster.java       | 39 +++++++++-----
 .../rpc/SliderClusterProtocolService.java       | 54 +++++++++++---------
 2 files changed, 55 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ca1e0088/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index ab67b16..8a33735 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -217,7 +217,8 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
   public static final int NUM_RPC_HANDLERS = 5;
 
   /**
-   * Metrics and monitoring services
+   * Metrics and monitoring services.
+   * Deployed in {@link #serviceInit(Configuration)}
    */
   private final MetricsAndMonitoring metricsAndMonitoring = new 
MetricsAndMonitoring(); 
   /**
@@ -292,6 +293,11 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
   private final AppState appState =
       new AppState(new ProtobufRecordFactory(), metricsAndMonitoring);
 
+  /**
+   * App state for external objects. This is almost entirely
+   * a read-only view of the application state. To change the state,
+   * Providers (or anything else) are expected to queue async changes.
+   */
   private final ProviderAppState stateForProviders =
       new ProviderAppState("undefined", appState);
 
@@ -303,12 +309,6 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
   private final Condition isAMCompleted = AMExecutionStateLock.newCondition();
 
   /**
-   * Exit code for the AM to return
-   */
-  @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
-  private int amExitCode =  0;
-  
-  /**
    * Flag set if the AM is to be shutdown
    */
   private final AtomicBoolean amCompletionFlag = new AtomicBoolean(false);
@@ -382,8 +382,16 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
   private SliderAMProviderService sliderAMProvider;
   private CertificateManager certificateManager;
 
+  /**
+   * Executor.
+   * Assigned in {@link #serviceInit(Configuration)}
+   */
   private WorkflowExecutorService<ExecutorService> executorService;
-  
+
+  /**
+   * Action queues. Created at instance creation, but
+   * added as a child and inited in {@link #serviceInit(Configuration)}
+   */
   private final QueueService actionQueues = new QueueService();
   private String agentOpsUrl;
   private String agentStatusUrl;
@@ -397,6 +405,11 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
    * The port for the web application
    */
   private int webAppPort;
+
+  /**
+   * Is security enabled?
+   * Set early on in the {@link #createAndRunCluster(String)} operation.
+   */
   private boolean securityEnabled;
 
   /**
@@ -923,9 +936,6 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
       startAgentWebApp(appInformation, serviceConf, webAppApi);
       deployWebApplication(webAppPort, webAppApi);
 
-      // bind the IPC service to the API
-      sliderIPCService.bind(webAppApi);
-      
       // schedule YARN Registry registration
       queue(new ActionRegisterServiceInstance(clustername, appid));
 
@@ -1425,7 +1435,6 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
     String appMessage = stopAction.getMessage();
     //stop the daemon & grab its exit code
     int exitCode = stopAction.getExitCode();
-    amExitCode = exitCode;
 
     appStatus = stopAction.getFinalApplicationStatus();
     if (!spawnedProcessExitedBeforeShutdownTriggered) {
@@ -1489,7 +1498,11 @@ public class SliderAppMaster extends 
AbstractSliderLaunchedService
       throws IOException, SliderException {
     verifyIPCAccess();
 
-    sliderIPCService = new SliderClusterProtocolService();
+    sliderIPCService = new SliderClusterProtocolService(
+        this,
+        stateForProviders,
+        actionQueues,
+        metricsAndMonitoring);
 
     deployChildService(sliderIPCService);
     SliderClusterProtocolPBImpl protobufRelay =

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/ca1e0088/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolService.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolService.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolService.java
index 613bda4..d467414 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolService.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/rpc/SliderClusterProtocolService.java
@@ -18,6 +18,7 @@
 
 package org.apache.slider.server.appmaster.rpc;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -30,6 +31,7 @@ import org.apache.slider.core.conf.ConfTree;
 import org.apache.slider.core.exceptions.ServiceNotReadyException;
 import org.apache.slider.core.main.LauncherExitCodes;
 import org.apache.slider.core.persist.ConfTreeSerDeser;
+import org.apache.slider.server.appmaster.AppMasterActionOperations;
 import org.apache.slider.server.appmaster.actions.ActionFlexCluster;
 import org.apache.slider.server.appmaster.actions.ActionHalt;
 import org.apache.slider.server.appmaster.actions.ActionKillContainer;
@@ -39,7 +41,6 @@ import org.apache.slider.server.appmaster.actions.QueueAccess;
 import org.apache.slider.server.appmaster.management.MetricsAndMonitoring;
 import org.apache.slider.server.appmaster.state.RoleInstance;
 import org.apache.slider.server.appmaster.state.StateAccessForProviders;
-import org.apache.slider.server.appmaster.web.WebAppApi;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,8 +50,6 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * Implement the {@link SliderClusterProtocol}.
- * Important: not live until {@link #bind(WebAppApi)}
- * is invoked. Until then all RPC calls will raise an exception.
  */
 public class SliderClusterProtocolService extends AbstractService
     implements SliderClusterProtocol {
@@ -58,28 +57,38 @@ public class SliderClusterProtocolService extends 
AbstractService
   protected static final Logger log =
       LoggerFactory.getLogger(SliderClusterProtocol.class);
 
-  private WebAppApi appmaster;
-  private QueueAccess actionQueues;
-  private StateAccessForProviders appState;
-  private MetricsAndMonitoring metricsAndMonitoring;
+  private final QueueAccess actionQueues;
+  private final StateAccessForProviders appState;
+  private final MetricsAndMonitoring metricsAndMonitoring;
+  private final AppMasterActionOperations amOperations;
+  
+  /**
+   * This is the prefix used for metrics
+   */
   public static final String PROTOCOL_PREFIX =
       "org.apache.slider.api.SliderClusterProtocol.";
 
-  public SliderClusterProtocolService() {
-    super("SliderClusterProtocolService");
-  }
-
   /**
-   * Bind to the AM API
-   * @param appmaster api binding.
+   * Constructor
+   * @param amOperations access to any AM operations
+   * @param appState state view
+   * @param actionQueues queues for actions
+   * @param metricsAndMonitoring metrics
    */
-  public void bind(WebAppApi appmaster) {
-    this.appmaster = appmaster;
-    actionQueues = appmaster.getQueues();
-    appState = appmaster.getAppState();
-    metricsAndMonitoring = appmaster.getMetricsAndMonitoring();
+  public SliderClusterProtocolService(AppMasterActionOperations amOperations,
+      StateAccessForProviders appState,
+      QueueAccess actionQueues,
+      MetricsAndMonitoring metricsAndMonitoring) {
+    super("SliderClusterProtocolService");
+    Preconditions.checkArgument(amOperations != null, "null amOperations");
+    Preconditions.checkArgument(appState != null, "null appState");
+    Preconditions.checkArgument(actionQueues != null, "null actionQueues");
+    Preconditions.checkArgument(metricsAndMonitoring != null, "null 
metricsAndMonitoring");
+    this.appState = appState;
+    this.actionQueues = actionQueues;
+    this.metricsAndMonitoring = metricsAndMonitoring;
+    this.amOperations = amOperations;
   }
-  
 
   @Override   //SliderClusterProtocol
   public ProtocolSignature getProtocolSignature(String protocol,
@@ -105,11 +114,6 @@ public class SliderClusterProtocolService extends 
AbstractService
    */
   protected void onRpcCall(String operation) throws IOException {
     log.debug("Received call to {}", operation);
-    if (appmaster == null) {
-      // fail fast if the service is not ready
-      log.warn("Rejecting {} as service is not ready", operation);
-      throw new ServiceNotReadyException(ServiceNotReadyException.E_NOT_READY);
-    }
     metricsAndMonitoring.markMeterAndCounter(PROTOCOL_PREFIX + operation);
   }
 
@@ -270,7 +274,7 @@ public class SliderClusterProtocolService extends 
AbstractService
     RoleInstance instance =
         appState.getLiveInstanceByContainerID(containerID);
     queue(new ActionKillContainer(instance.getId(), 0, TimeUnit.MILLISECONDS,
-        appmaster.getAMOperations()));
+        amOperations));
     Messages.KillContainerResponseProto.Builder builder =
         Messages.KillContainerResponseProto.newBuilder();
     builder.setSuccess(true);

Reply via email to