YARN-5467. InputValidator for the FederationStateStore internal APIs. (Giovanni 
Matteo Fumarola via Subru)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/888c1500
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/888c1500
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/888c1500

Branch: refs/heads/YARN-2915
Commit: 888c150048b0ca3fe57d29306fce465a213f81f4
Parents: aaedc83
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Aug 17 12:07:06 2016 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon Sep 19 12:58:34 2016 -0700

----------------------------------------------------------------------
 .../store/impl/MemoryFederationStateStore.java  |   30 +
 ...cationHomeSubClusterStoreInputValidator.java |  183 +++
 ...ationMembershipStateStoreInputValidator.java |  317 +++++
 .../FederationPolicyStoreInputValidator.java    |  144 ++
 ...derationStateStoreInvalidInputException.java |   48 +
 .../federation/store/utils/package-info.java    |   17 +
 .../impl/FederationStateStoreBaseTest.java      |    6 +-
 .../TestFederationStateStoreInputValidator.java | 1265 ++++++++++++++++++
 8 files changed, 2007 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/888c1500/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index 8144435..6e564dc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -57,6 +57,9 @@ import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegister
 import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
 import 
org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
 import 
org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import 
org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import 
org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import 
org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.util.MonotonicClock;
 
@@ -88,6 +91,8 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
   @Override
   public SubClusterRegisterResponse registerSubCluster(
       SubClusterRegisterRequest request) throws YarnException {
+    FederationMembershipStateStoreInputValidator
+        .validateSubClusterRegisterRequest(request);
     SubClusterInfo subClusterInfo = request.getSubClusterInfo();
     membership.put(subClusterInfo.getSubClusterId(), subClusterInfo);
     return SubClusterRegisterResponse.newInstance();
@@ -96,6 +101,8 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
   @Override
   public SubClusterDeregisterResponse deregisterSubCluster(
       SubClusterDeregisterRequest request) throws YarnException {
+    FederationMembershipStateStoreInputValidator
+        .validateSubClusterDeregisterRequest(request);
     SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
     if (subClusterInfo == null) {
       throw new YarnException(
@@ -111,6 +118,8 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
   public SubClusterHeartbeatResponse subClusterHeartbeat(
       SubClusterHeartbeatRequest request) throws YarnException {
 
+    FederationMembershipStateStoreInputValidator
+        .validateSubClusterHeartbeatRequest(request);
     SubClusterId subClusterId = request.getSubClusterId();
     SubClusterInfo subClusterInfo = membership.get(subClusterId);
 
@@ -129,6 +138,9 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
   @Override
   public GetSubClusterInfoResponse getSubCluster(
       GetSubClusterInfoRequest request) throws YarnException {
+
+    FederationMembershipStateStoreInputValidator
+        .validateGetSubClusterInfoRequest(request);
     SubClusterId subClusterId = request.getSubClusterId();
     if (!membership.containsKey(subClusterId)) {
       throw new YarnException(
@@ -157,6 +169,9 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
   @Override
   public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
       AddApplicationHomeSubClusterRequest request) throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator
+        .validateAddApplicationHomeSubClusterRequest(request);
     ApplicationId appId =
         request.getApplicationHomeSubCluster().getApplicationId();
 
@@ -172,6 +187,9 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
   @Override
   public UpdateApplicationHomeSubClusterResponse 
updateApplicationHomeSubCluster(
       UpdateApplicationHomeSubClusterRequest request) throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator
+        .validateUpdateApplicationHomeSubClusterRequest(request);
     ApplicationId appId =
         request.getApplicationHomeSubCluster().getApplicationId();
     if (!applications.containsKey(appId)) {
@@ -186,6 +204,9 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
   @Override
   public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
       GetApplicationHomeSubClusterRequest request) throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator
+        .validateGetApplicationHomeSubClusterRequest(request);
     ApplicationId appId = request.getApplicationId();
     if (!applications.containsKey(appId)) {
       throw new YarnException("Application " + appId + " does not exist");
@@ -212,6 +233,9 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
   @Override
   public DeleteApplicationHomeSubClusterResponse 
deleteApplicationHomeSubCluster(
       DeleteApplicationHomeSubClusterRequest request) throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator
+        .validateDeleteApplicationHomeSubClusterRequest(request);
     ApplicationId appId = request.getApplicationId();
     if (!applications.containsKey(appId)) {
       throw new YarnException("Application " + appId + " does not exist");
@@ -224,6 +248,9 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
   @Override
   public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
       GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+    FederationPolicyStoreInputValidator
+        .validateGetSubClusterPolicyConfigurationRequest(request);
     String queue = request.getQueue();
     if (!policies.containsKey(queue)) {
       throw new YarnException("Policy for queue " + queue + " does not exist");
@@ -236,6 +263,9 @@ public class MemoryFederationStateStore implements 
FederationStateStore {
   @Override
   public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
       SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+    FederationPolicyStoreInputValidator
+        .validateSetSubClusterPolicyConfigurationRequest(request);
     policies.put(request.getPolicyConfiguration().getQueue(),
         request.getPolicyConfiguration());
     return SetSubClusterPolicyConfigurationResponse.newInstance();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/888c1500/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
new file mode 100644
index 0000000..c14a452
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationApplicationHomeSubClusterStoreInputValidator.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import 
org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import 
org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to validate the inputs to
+ * {@code FederationApplicationHomeSubClusterStore}, allows a fail fast
+ * mechanism for invalid user inputs.
+ *
+ */
+public final class FederationApplicationHomeSubClusterStoreInputValidator {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(FederationApplicationHomeSubClusterStoreInputValidator.class);
+
+  private FederationApplicationHomeSubClusterStoreInputValidator() {
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link AddApplicationHomeSubClusterRequest}
+   * for adding a new application is valid or not.
+   *
+   * @param request the {@link AddApplicationHomeSubClusterRequest} to validate
+   *          against
+   * @throws FederationStateStoreInvalidInputException if the request is 
invalid
+   */
+  public static void validateAddApplicationHomeSubClusterRequest(
+      AddApplicationHomeSubClusterRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing AddApplicationHomeSubCluster Request."
+          + " Please try again by specifying"
+          + " an AddApplicationHomeSubCluster information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate ApplicationHomeSubCluster info
+    checkApplicationHomeSubCluster(request.getApplicationHomeSubCluster());
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link 
UpdateApplicationHomeSubClusterRequest}
+   * for updating an application is valid or not.
+   *
+   * @param request the {@link UpdateApplicationHomeSubClusterRequest} to
+   *          validate against
+   * @throws FederationStateStoreInvalidInputException if the request is 
invalid
+   */
+  public static void validateUpdateApplicationHomeSubClusterRequest(
+      UpdateApplicationHomeSubClusterRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing UpdateApplicationHomeSubCluster Request."
+          + " Please try again by specifying"
+          + " an ApplicationHomeSubCluster information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate ApplicationHomeSubCluster info
+    checkApplicationHomeSubCluster(request.getApplicationHomeSubCluster());
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link GetApplicationHomeSubClusterRequest}
+   * for querying application's information is valid or not.
+   *
+   * @param request the {@link GetApplicationHomeSubClusterRequest} to validate
+   *          against
+   * @throws FederationStateStoreInvalidInputException if the request is 
invalid
+   */
+  public static void validateGetApplicationHomeSubClusterRequest(
+      GetApplicationHomeSubClusterRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing GetApplicationHomeSubCluster Request."
+          + " Please try again by specifying an Application Id information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate application Id
+    checkApplicationId(request.getApplicationId());
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link 
DeleteApplicationHomeSubClusterRequest}
+   * for deleting an application is valid or not.
+   *
+   * @param request the {@link DeleteApplicationHomeSubClusterRequest} to
+   *          validate against
+   * @throws FederationStateStoreInvalidInputException if the request is 
invalid
+   */
+  public static void validateDeleteApplicationHomeSubClusterRequest(
+      DeleteApplicationHomeSubClusterRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing DeleteApplicationHomeSubCluster Request."
+          + " Please try again by specifying"
+          + " an ApplicationHomeSubCluster information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate application Id
+    checkApplicationId(request.getApplicationId());
+  }
+
+  /**
+   * Validate if the ApplicationHomeSubCluster info are present or not.
+   *
+   * @param applicationHomeSubCluster the information of the application to be
+   *          verified
+   * @throws FederationStateStoreInvalidInputException if the SubCluster Info
+   *           are invalid
+   */
+  private static void checkApplicationHomeSubCluster(
+      ApplicationHomeSubCluster applicationHomeSubCluster)
+
+      throws FederationStateStoreInvalidInputException {
+    if (applicationHomeSubCluster == null) {
+      String message = "Missing ApplicationHomeSubCluster Info."
+          + " Please try again by specifying"
+          + " an ApplicationHomeSubCluster information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+    // validate application Id
+    checkApplicationId(applicationHomeSubCluster.getApplicationId());
+
+    // validate subcluster Id
+    FederationMembershipStateStoreInputValidator
+        .checkSubClusterId(applicationHomeSubCluster.getHomeSubCluster());
+
+  }
+
+  /**
+   * Validate if the application id is present or not.
+   *
+   * @param appId the id of the application to be verified
+   * @throws FederationStateStoreInvalidInputException if the application Id is
+   *           invalid
+   */
+  private static void checkApplicationId(ApplicationId appId)
+      throws FederationStateStoreInvalidInputException {
+    if (appId == null) {
+      String message = "Missing Application Id."
+          + " Please try again by specifying an Application Id.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/888c1500/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
new file mode 100644
index 0000000..b587ee5
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationMembershipStateStoreInputValidator.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import java.net.URI;
+
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to validate the inputs to
+ * {@code FederationMembershipStateStore}, allows a fail fast mechanism for
+ * invalid user inputs.
+ *
+ */
+public final class FederationMembershipStateStoreInputValidator {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(FederationMembershipStateStoreInputValidator.class);
+
+  private FederationMembershipStateStoreInputValidator() {
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link SubClusterRegisterRequest} for
+   * registration a new subcluster is valid or not.
+   *
+   * @param request the {@link SubClusterRegisterRequest} to validate against
+   * @throws FederationStateStoreInvalidInputException if the request is 
invalid
+   */
+  public static void validateSubClusterRegisterRequest(
+      SubClusterRegisterRequest request)
+      throws FederationStateStoreInvalidInputException {
+
+    // check if the request is present
+    if (request == null) {
+      String message = "Missing SubClusterRegister Request."
+          + " Please try again by specifying a"
+          + " SubCluster Register Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+
+    }
+
+    // validate subcluster info
+    checkSubClusterInfo(request.getSubClusterInfo());
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link SubClusterDeregisterRequest} for
+   * deregistration a subcluster is valid or not.
+   *
+   * @param request the {@link SubClusterDeregisterRequest} to validate against
+   * @throws FederationStateStoreInvalidInputException if the request is 
invalid
+   */
+  public static void validateSubClusterDeregisterRequest(
+      SubClusterDeregisterRequest request)
+      throws FederationStateStoreInvalidInputException {
+
+    // check if the request is present
+    if (request == null) {
+      String message = "Missing SubClusterDeregister Request."
+          + " Please try again by specifying a"
+          + " SubCluster Deregister Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate subcluster id
+    checkSubClusterId(request.getSubClusterId());
+    // validate subcluster state
+    checkSubClusterState(request.getState());
+    if (!request.getState().isFinal()) {
+      String message = "Invalid non-final state: " + request.getState();
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link SubClusterHeartbeatRequest} for
+   * heartbeating a subcluster is valid or not.
+   *
+   * @param request the {@link SubClusterHeartbeatRequest} to validate against
+   * @throws FederationStateStoreInvalidInputException if the request is 
invalid
+   */
+  public static void validateSubClusterHeartbeatRequest(
+      SubClusterHeartbeatRequest request)
+      throws FederationStateStoreInvalidInputException {
+
+    // check if the request is present
+    if (request == null) {
+      String message = "Missing SubClusterHeartbeat Request."
+          + " Please try again by specifying a"
+          + " SubCluster Heartbeat Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate subcluster id
+    checkSubClusterId(request.getSubClusterId());
+    // validate last heartbeat timestamp
+    checkTimestamp(request.getLastHeartBeat());
+    // validate subcluster capability
+    checkCapability(request.getCapability());
+    // validate subcluster state
+    checkSubClusterState(request.getState());
+
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided {@link GetSubClusterInfoRequest} for querying
+   * subcluster's information is valid or not.
+   *
+   * @param request the {@link GetSubClusterInfoRequest} to validate against
+   * @throws FederationStateStoreInvalidInputException if the request is 
invalid
+   */
+  public static void validateGetSubClusterInfoRequest(
+      GetSubClusterInfoRequest request)
+      throws FederationStateStoreInvalidInputException {
+
+    // check if the request is present
+    if (request == null) {
+      String message = "Missing GetSubClusterInfo Request."
+          + " Please try again by specifying a Get SubCluster information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate subcluster id
+    checkSubClusterId(request.getSubClusterId());
+  }
+
+  /**
+   * Validate if the SubCluster Info are present or not.
+   *
+   * @param subClusterInfo the information of the subcluster to be verified
+   * @throws FederationStateStoreInvalidInputException if the SubCluster Info
+   *           are invalid
+   */
+  private static void checkSubClusterInfo(SubClusterInfo subClusterInfo)
+      throws FederationStateStoreInvalidInputException {
+    if (subClusterInfo == null) {
+      String message = "Missing SubCluster Information."
+          + " Please try again by specifying SubCluster Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate subcluster id
+    checkSubClusterId(subClusterInfo.getSubClusterId());
+
+    // validate AMRM Service address
+    checkAddress(subClusterInfo.getAMRMServiceAddress());
+    // validate ClientRM Service address
+    checkAddress(subClusterInfo.getClientRMServiceAddress());
+    // validate RMClient Service address
+    checkAddress(subClusterInfo.getRMAdminServiceAddress());
+    // validate RMWeb Service address
+    checkAddress(subClusterInfo.getRMWebServiceAddress());
+
+    // validate last heartbeat timestamp
+    checkTimestamp(subClusterInfo.getLastHeartBeat());
+    // validate last start timestamp
+    checkTimestamp(subClusterInfo.getLastStartTime());
+
+    // validate subcluster state
+    checkSubClusterState(subClusterInfo.getState());
+
+    // validate subcluster capability
+    checkCapability(subClusterInfo.getCapability());
+  }
+
+  /**
+   * Validate if the timestamp is positive or not.
+   *
+   * @param timestamp the timestamp to be verified
+   * @throws FederationStateStoreInvalidInputException if the timestamp is
+   *           invalid
+   */
+  private static void checkTimestamp(long timestamp)
+      throws FederationStateStoreInvalidInputException {
+    if (timestamp < 0) {
+      String message = "Invalid timestamp information."
+          + " Please try again by specifying valid Timestamp Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Validate if the Capability is present or not.
+   *
+   * @param capability the capability of the subcluster to be verified
+   * @throws FederationStateStoreInvalidInputException if the capability is
+   *           invalid
+   */
+  private static void checkCapability(String capability)
+      throws FederationStateStoreInvalidInputException {
+    if (capability == null || capability.isEmpty()) {
+      String message = "Invalid capability information."
+          + " Please try again by specifying valid Capability Information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Validate if the SubCluster Id is present or not.
+   *
+   * @param subClusterId the identifier of the subcluster to be verified
+   * @throws FederationStateStoreInvalidInputException if the SubCluster Id is
+   *           invalid
+   */
+  protected static void checkSubClusterId(SubClusterId subClusterId)
+      throws FederationStateStoreInvalidInputException {
+    // check if cluster id is present
+    if (subClusterId == null) {
+      String message = "Missing SubCluster Id information."
+          + " Please try again by specifying Subcluster Id information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+    // check if cluster id is valid
+    if (subClusterId.getId().isEmpty()) {
+      String message = "Invalid SubCluster Id information."
+          + " Please try again by specifying valid Subcluster Id.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Validate if the SubCluster Address is a valid URL or not.
+   *
+   * @param address the endpoint of the subcluster to be verified
+   * @throws FederationStateStoreInvalidInputException if the address is 
invalid
+   */
+  private static void checkAddress(String address)
+      throws FederationStateStoreInvalidInputException {
+    // Ensure url is not null
+    if (address == null || address.isEmpty()) {
+      String message = "Missing SubCluster Endpoint information."
+          + " Please try again by specifying SubCluster Endpoint information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+    // Validate url is well formed
+    boolean hasScheme = address.contains("://");
+    URI uri = null;
+    try {
+      uri = hasScheme ? URI.create(address)
+          : URI.create("dummyscheme://" + address);
+    } catch (IllegalArgumentException e) {
+      String message = "The provided SubCluster Endpoint does not contain a"
+          + " valid host:port authority: " + address;
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+    String host = uri.getHost();
+    int port = uri.getPort();
+    String path = uri.getPath();
+    if ((host == null) || (port < 0)
+        || (!hasScheme && path != null && !path.isEmpty())) {
+      String message = "The provided SubCluster Endpoint does not contain a"
+          + " valid host:port authority: " + address;
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Validate if the SubCluster State is present or not.
+   *
+   * @param state the state of the subcluster to be verified
+   * @throws FederationStateStoreInvalidInputException if the SubCluster State
+   *           is invalid
+   */
+  private static void checkSubClusterState(SubClusterState state)
+      throws FederationStateStoreInvalidInputException {
+    // check sub-cluster state is not empty
+    if (state == null) {
+      String message = "Missing SubCluster State information."
+          + " Please try again by specifying SubCluster State information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/888c1500/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
new file mode 100644
index 0000000..273a8ac
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationPolicyStoreInputValidator.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import 
org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class to validate the inputs to {@code FederationPolicyStore}, 
allows
+ * a fail fast mechanism for invalid user inputs.
+ *
+ */
+public final class FederationPolicyStoreInputValidator {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationPolicyStoreInputValidator.class);
+
+  private FederationPolicyStoreInputValidator() {
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided
+   * {@link GetSubClusterPolicyConfigurationRequest} for querying policy's
+   * information is valid or not.
+   *
+   * @param request the {@link GetSubClusterPolicyConfigurationRequest} to
+   *          validate against
+   * @throws FederationStateStoreInvalidInputException if the request is 
invalid
+   */
+  public static void validateGetSubClusterPolicyConfigurationRequest(
+      GetSubClusterPolicyConfigurationRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing GetSubClusterPolicyConfiguration Request."
+          + " Please try again by specifying a policy selection information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate queue id
+    checkQueue(request.getQueue());
+  }
+
+  /**
+   * Quick validation on the input to check some obvious fail conditions (fail
+   * fast). Check if the provided
+   * {@link SetSubClusterPolicyConfigurationRequest} for adding a new policy is
+   * valid or not.
+   *
+   * @param request the {@link SetSubClusterPolicyConfigurationRequest} to
+   *          validate against
+   * @throws FederationStateStoreInvalidInputException if the request is 
invalid
+   */
+  public static void validateSetSubClusterPolicyConfigurationRequest(
+      SetSubClusterPolicyConfigurationRequest request)
+      throws FederationStateStoreInvalidInputException {
+    if (request == null) {
+      String message = "Missing SetSubClusterPolicyConfiguration Request."
+          + " Please try again by specifying an policy insertion information.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate subcluster policy configuration
+    checkSubClusterPolicyConfiguration(request.getPolicyConfiguration());
+  }
+
+  /**
+   * Validate if the SubClusterPolicyConfiguration is valid or not.
+   *
+   * @param policyConfiguration the policy information to be verified
+   * @throws FederationStateStoreInvalidInputException if the policy 
information
+   *           are invalid
+   */
+  private static void checkSubClusterPolicyConfiguration(
+      SubClusterPolicyConfiguration policyConfiguration)
+      throws FederationStateStoreInvalidInputException {
+    if (policyConfiguration == null) {
+      String message = "Missing SubClusterPolicyConfiguration."
+          + " Please try again by specifying a SubClusterPolicyConfiguration.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+
+    // validate queue id
+    checkQueue(policyConfiguration.getQueue());
+    // validate policy type
+    checkType(policyConfiguration.getType());
+
+  }
+
+  /**
+   * Validate if the queue id is a valid or not.
+   *
+   * @param queue the queue id of the policy to be verified
+   * @throws FederationStateStoreInvalidInputException if the queue id is
+   *           invalid
+   */
+  private static void checkQueue(String queue)
+      throws FederationStateStoreInvalidInputException {
+    if (queue == null || queue.isEmpty()) {
+      String message = "Missing Queue. Please try again by specifying a 
Queue.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+  /**
+   * Validate if the policy type is a valid or not.
+   *
+   * @param type the type of the policy to be verified
+   * @throws FederationStateStoreInvalidInputException if the policy is invalid
+   */
+  private static void checkType(String type)
+      throws FederationStateStoreInvalidInputException {
+    if (type == null || type.isEmpty()) {
+      String message = "Missing Policy Type."
+          + " Please try again by specifying a Policy Type.";
+      LOG.warn(message);
+      throw new FederationStateStoreInvalidInputException(message);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/888c1500/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java
new file mode 100644
index 0000000..ea1428d
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/FederationStateStoreInvalidInputException.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.utils;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Exception thrown by the {@link 
FederationMembershipStateStoreInputValidator},
+ * {@link FederationApplicationHomeSubClusterStoreInputValidator},
+ * {@link FederationPolicyStoreInputValidator} if the input is invalid.
+ *
+ */
+public class FederationStateStoreInvalidInputException extends YarnException {
+
+  /**
+   * IDE auto-generated.
+   */
+  private static final long serialVersionUID = -7352144682711430801L;
+
+  public FederationStateStoreInvalidInputException(Throwable cause) {
+    super(cause);
+  }
+
+  public FederationStateStoreInvalidInputException(String message) {
+    super(message);
+  }
+
+  public FederationStateStoreInvalidInputException(String message,
+      Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/888c1500/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/package-info.java
new file mode 100644
index 0000000..f4a9c7e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/utils/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.utils;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/888c1500/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index 414696b..63a5b65 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -162,9 +162,9 @@ public abstract class FederationStateStoreBaseTest {
         SubClusterRegisterRequest.newInstance(subClusterInfo2));
 
     stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
-        .newInstance(subClusterId1, SubClusterState.SC_RUNNING, ""));
-    stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest
-        .newInstance(subClusterId2, SubClusterState.SC_UNHEALTHY, ""));
+        .newInstance(subClusterId1, SubClusterState.SC_RUNNING, "capability"));
+    stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest.newInstance(
+        subClusterId2, SubClusterState.SC_UNHEALTHY, "capability"));
 
     Assert.assertTrue(
         stateStore.getSubClusters(GetSubClustersInfoRequest.newInstance(true))


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to