YARN-3673. Create a FailoverProxy for Federation services. Contributed by Subru 
Krishnan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b5af206d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b5af206d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b5af206d

Branch: refs/heads/YARN-2915
Commit: b5af206d698b8f0340d6d10f6fbb8ee5089d6231
Parents: 888c150
Author: Jian He <jia...@apache.org>
Authored: Mon Aug 22 14:43:07 2016 +0800
Committer: Subru Krishnan <su...@apache.org>
Committed: Mon Sep 19 12:58:34 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/yarn/conf/HAUtil.java     |  30 ++-
 .../hadoop/yarn/conf/YarnConfiguration.java     |  10 +
 .../yarn/conf/TestYarnConfigurationFields.java  |   4 +
 .../TestFederationRMFailoverProxyProvider.java  | 154 ++++++++++++++
 .../hadoop/yarn/client/ClientRMProxy.java       |   4 +-
 .../org/apache/hadoop/yarn/client/RMProxy.java  |  23 +-
 .../src/main/resources/yarn-default.xml         |   7 +
 .../hadoop-yarn-server-common/pom.xml           |   2 -
 .../hadoop/yarn/server/api/ServerRMProxy.java   |   4 +-
 .../failover/FederationProxyProviderUtil.java   | 163 ++++++++++++++
 .../FederationRMFailoverProxyProvider.java      | 211 +++++++++++++++++++
 .../federation/failover/package-info.java       |  17 ++
 12 files changed, 613 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
index 594832e..cf221a7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/HAUtil.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.yarn.conf;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -27,8 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
-import java.net.InetSocketAddress;
-import java.util.Collection;
+import com.google.common.annotations.VisibleForTesting;
 
 @InterfaceAudience.Private
 public class HAUtil {
@@ -44,6 +45,29 @@ public class HAUtil {
   }
 
   /**
+   * Returns true if Federation is configured.
+   *
+   * @param conf Configuration
+   * @return true if federation is configured in the configuration; else false.
+   */
+  public static boolean isFederationEnabled(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
+        YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
+  }
+
+  /**
+   * Returns true if RM failover is enabled in a Federation setting.
+   *
+   * @param conf Configuration
+   * @return if RM failover is enabled in conjunction with Federation in the
+   *         configuration; else false.
+   */
+  public static boolean isFederationFailoverEnabled(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
+        YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
+  }
+
+  /**
    * Returns true if Resource Manager HA is configured.
    *
    * @param conf Configuration

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1e53bf4..293df71 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2487,6 +2487,16 @@ public class YarnConfiguration extends Configuration {
 
   public static final String FEDERATION_PREFIX = YARN_PREFIX + "federation.";
 
+  public static final String FEDERATION_ENABLED = FEDERATION_PREFIX + 
"enabled";
+  public static final boolean DEFAULT_FEDERATION_ENABLED = false;
+
+  public static final String FEDERATION_FAILOVER_ENABLED =
+      FEDERATION_PREFIX + "failover.enabled";
+  public static final boolean DEFAULT_FEDERATION_FAILOVER_ENABLED = true;
+
+  public static final String FEDERATION_SUBCLUSTER_ID =
+      FEDERATION_PREFIX + "sub-cluster.id";
+
   public static final String FEDERATION_STATESTORE_CLIENT_CLASS =
       FEDERATION_PREFIX + "state-store.class";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 000f5de..63413eb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -95,6 +95,10 @@ public class TestYarnConfigurationFields extends 
TestConfigurationFieldsBase {
     // Federation default configs to be ignored
     configurationPropsToSkipCompare
         .add(YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.FEDERATION_FAILOVER_ENABLED);
 
     // Ignore blacklisting nodes for AM failures feature since it is still a
     // "work in progress"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
new file mode 100644
index 0000000..fa3523c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestFederationRMFailoverProxyProvider.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import 
org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import 
org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for FederationRMFailoverProxyProvider.
+ */
+public class TestFederationRMFailoverProxyProvider {
+
+  private Configuration conf;
+  private FederationStateStore stateStore;
+  private final String dummyCapability = "cap";
+
+  @Before
+  public void setUp() throws IOException, YarnException {
+    conf = new YarnConfiguration();
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(conf);
+    FederationStateStoreFacade.getInstance().reinitialize(stateStore, conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    stateStore.close();
+    stateStore = null;
+  }
+
+  @Test
+  public void testFederationRMFailoverProxyProvider() throws Exception {
+    final SubClusterId subClusterId = SubClusterId.newInstance("SC-1");
+    final MiniYARNCluster cluster = new MiniYARNCluster(
+        "testFederationRMFailoverProxyProvider", 3, 0, 1, 1);
+
+    conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+    conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+    conf.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2,rm3");
+
+    conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
+        2000);
+
+    HATestUtil.setRpcAddressForRM("rm1", 10000, conf);
+    HATestUtil.setRpcAddressForRM("rm2", 20000, conf);
+    HATestUtil.setRpcAddressForRM("rm3", 30000, conf);
+    conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
+
+    cluster.init(conf);
+    cluster.start();
+
+    // Transition rm3 to active;
+    makeRMActive(subClusterId, cluster, 2);
+
+    ApplicationClientProtocol client = FederationProxyProviderUtil
+        .createRMProxy(conf, ApplicationClientProtocol.class, subClusterId,
+            UserGroupInformation.getCurrentUser());
+
+    // client will retry until the rm becomes active.
+    GetClusterMetricsResponse response =
+        client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
+
+    // validate response
+    checkResponse(response);
+
+    // transition rm3 to standby
+    cluster.getResourceManager(2).getRMContext().getRMAdminService()
+        .transitionToStandby(new HAServiceProtocol.StateChangeRequestInfo(
+            HAServiceProtocol.RequestSource.REQUEST_BY_USER));
+
+    // Transition rm2 to active;
+    makeRMActive(subClusterId, cluster, 1);
+    response = 
client.getClusterMetrics(GetClusterMetricsRequest.newInstance());
+
+    // validate response
+    checkResponse(response);
+
+    cluster.stop();
+  }
+
+  private void checkResponse(GetClusterMetricsResponse response) {
+    Assert.assertNotNull(response.getClusterMetrics());
+    Assert.assertEquals(0,
+        response.getClusterMetrics().getNumActiveNodeManagers());
+  }
+
+  private void makeRMActive(final SubClusterId subClusterId,
+      final MiniYARNCluster cluster, final int index) {
+    try {
+      System.out.println("Transition rm" + (index + 1) + " to active");
+      String dummyAddress = "host:" + index;
+      cluster.getResourceManager(index).getRMContext().getRMAdminService()
+          .transitionToActive(new HAServiceProtocol.StateChangeRequestInfo(
+              HAServiceProtocol.RequestSource.REQUEST_BY_USER));
+      ResourceManager rm = cluster.getResourceManager(index);
+      InetSocketAddress amRMAddress =
+          rm.getApplicationMasterService().getBindAddress();
+      InetSocketAddress clientRMAddress =
+          rm.getClientRMService().getBindAddress();
+      SubClusterRegisterRequest request = SubClusterRegisterRequest
+          .newInstance(SubClusterInfo.newInstance(subClusterId,
+              amRMAddress.getAddress().getHostAddress() + ":"
+                  + amRMAddress.getPort(),
+              clientRMAddress.getAddress().getHostAddress() + ":"
+                  + clientRMAddress.getPort(),
+              dummyAddress, dummyAddress, SubClusterState.SC_NEW, 1,
+              dummyCapability));
+      stateStore.registerSubCluster(request);
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
index b29263e..6365662 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/ClientRMProxy.java
@@ -84,7 +84,7 @@ public class ClientRMProxy<T> extends RMProxy<T>  {
 
   @Private
   @Override
-  protected InetSocketAddress getRMAddress(YarnConfiguration conf,
+  public InetSocketAddress getRMAddress(YarnConfiguration conf,
       Class<?> protocol) throws IOException {
     if (protocol == ApplicationClientProtocol.class) {
       return conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
@@ -111,7 +111,7 @@ public class ClientRMProxy<T> extends RMProxy<T>  {
 
   @Private
   @Override
-  protected void checkAllowedProtocols(Class<?> protocol) {
+  public void checkAllowedProtocols(Class<?> protocol) {
     Preconditions.checkArgument(
         protocol.isAssignableFrom(ClientRMProtocols.class),
         "RM does not support this client protocol");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
index 3ab06bd..f93a182 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMProxy.java
@@ -33,8 +33,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -64,14 +64,14 @@ public class RMProxy<T> {
    * Verify the passed protocol is supported.
    */
   @Private
-  protected void checkAllowedProtocols(Class<?> protocol) {}
+  public void checkAllowedProtocols(Class<?> protocol) {}
 
   /**
    * Get the ResourceManager address from the provided Configuration for the
    * given protocol.
    */
   @Private
-  protected InetSocketAddress getRMAddress(
+  public InetSocketAddress getRMAddress(
       YarnConfiguration conf, Class<?> protocol) throws IOException {
     throw new UnsupportedOperationException("This method should be invoked " +
         "from an instance of ClientRMProxy or ServerRMProxy");
@@ -90,7 +90,8 @@ public class RMProxy<T> {
     YarnConfiguration conf = (configuration instanceof YarnConfiguration)
         ? (YarnConfiguration) configuration
         : new YarnConfiguration(configuration);
-    RetryPolicy retryPolicy = createRetryPolicy(conf, 
HAUtil.isHAEnabled(conf));
+    RetryPolicy retryPolicy = createRetryPolicy(conf,
+        (HAUtil.isHAEnabled(conf) || 
HAUtil.isFederationFailoverEnabled(conf)));
     return newProxyInstance(conf, protocol, instance, retryPolicy);
   }
 
@@ -116,7 +117,7 @@ public class RMProxy<T> {
   private static <T> T newProxyInstance(final YarnConfiguration conf,
       final Class<T> protocol, RMProxy instance, RetryPolicy retryPolicy)
           throws IOException{
-    if (HAUtil.isHAEnabled(conf)) {
+    if (HAUtil.isHAEnabled(conf) || HAUtil.isFederationEnabled(conf)) {
       RMFailoverProxyProvider<T> provider =
           instance.createRMFailoverProxyProvider(conf, protocol);
       return (T) RetryProxy.create(protocol, provider, retryPolicy);
@@ -146,7 +147,8 @@ public class RMProxy<T> {
   @Deprecated
   public static <T> T createRMProxy(final Configuration conf,
       final Class<T> protocol, InetSocketAddress rmAddress) throws IOException 
{
-    RetryPolicy retryPolicy = createRetryPolicy(conf, 
HAUtil.isHAEnabled(conf));
+    RetryPolicy retryPolicy = createRetryPolicy(conf,
+        (HAUtil.isHAEnabled(conf) || 
HAUtil.isFederationFailoverEnabled(conf)));
     T proxy = RMProxy.<T>getProxy(conf, protocol, rmAddress);
     LOG.info("Connecting to ResourceManager at " + rmAddress);
     return (T) RetryProxy.create(protocol, proxy, retryPolicy);
@@ -155,9 +157,16 @@ public class RMProxy<T> {
   /**
    * Get a proxy to the RM at the specified address. To be used to create a
    * RetryProxy.
+   *
+   * @param conf Configuration to generate retry policy
+   * @param protocol Protocol for the proxy
+   * @param rmAddress Address of the ResourceManager
+   * @param <T> Type information of the proxy
+   * @return Proxy to the RM
+   * @throws IOException on failure
    */
   @Private
-  static <T> T getProxy(final Configuration conf,
+  public static <T> T getProxy(final Configuration conf,
       final Class<T> protocol, final InetSocketAddress rmAddress)
       throws IOException {
     return UserGroupInformation.getCurrentUser().doAs(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 4fe916f..5461535 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -2632,6 +2632,13 @@
   <!-- Federation Configuration -->
   <property>
     <description>
+      Flag to indicate whether the RM is participating in Federation or not.
+    </description>
+    <name>yarn.federation.enabled</name>
+    <value>false</value>
+  </property>
+  <property>
+    <description>
       Machine list file to be loaded by the FederationSubCluster Resolver
     </description>
     <name>yarn.federation.machine-list</name>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index b6fd0c5..1faf754 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -112,12 +112,10 @@
     <dependency>
       <groupId>javax.cache</groupId>
       <artifactId>cache-api</artifactId>
-      <version>${jcache.version}</version>
     </dependency>
     <dependency>
       <groupId>org.ehcache</groupId>
       <artifactId>ehcache</artifactId>
-      <version>${ehcache.version}</version>
     </dependency>
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
index 8555fc3..b3038e9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/ServerRMProxy.java
@@ -71,7 +71,7 @@ public class ServerRMProxy<T> extends RMProxy<T> {
 
   @InterfaceAudience.Private
   @Override
-  protected InetSocketAddress getRMAddress(YarnConfiguration conf,
+  public InetSocketAddress getRMAddress(YarnConfiguration conf,
                                            Class<?> protocol) {
     if (protocol == ResourceTracker.class) {
       return conf.getSocketAddr(
@@ -93,7 +93,7 @@ public class ServerRMProxy<T> extends RMProxy<T> {
 
   @InterfaceAudience.Private
   @Override
-  protected void checkAllowedProtocols(Class<?> protocol) {
+  public void checkAllowedProtocols(Class<?> protocol) {
     Preconditions.checkArgument(
         protocol.isAssignableFrom(ResourceTracker.class),
         "ResourceManager does not support this protocol");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
new file mode 100644
index 0000000..a986008
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationProxyProviderUtil.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.failover;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that creates proxy for specified protocols when federation is
+ * enabled. The class creates a federation aware failover provider, i.e. the
+ * failover provider uses the {@code FederationStateStore} to determine the
+ * current active ResourceManager
+ */
+@Private
+@Unstable
+public final class FederationProxyProviderUtil {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(FederationProxyProviderUtil.class);
+
+  /**
+   * Create a proxy for the specified protocol. For non-HA, this is a direct
+   * connection to the ResourceManager address. When HA is enabled, the proxy
+   * handles the failover between the ResourceManagers as well.
+   *
+   * @param configuration Configuration to generate {@link ClientRMProxy}
+   * @param protocol Protocol for the proxy
+   * @param subClusterId the unique identifier or the sub-cluster
+   * @param user the user on whose behalf the proxy is being created
+   * @param <T> Type information of the proxy
+   * @return Proxy to the RM
+   * @throws IOException on failure
+   */
+  @Public
+  @Unstable
+  public static <T> T createRMProxy(Configuration configuration,
+      final Class<T> protocol, SubClusterId subClusterId,
+      UserGroupInformation user) throws IOException {
+    return createRMProxy(configuration, protocol, subClusterId, user, null);
+  }
+
+  /**
+   * Create a proxy for the specified protocol. For non-HA, this is a direct
+   * connection to the ResourceManager address. When HA is enabled, the proxy
+   * handles the failover between the ResourceManagers as well.
+   *
+   * @param configuration Configuration to generate {@link ClientRMProxy}
+   * @param protocol Protocol for the proxy
+   * @param subClusterId the unique identifier or the sub-cluster
+   * @param user the user on whose behalf the proxy is being created
+   * @param token the auth token to use for connection
+   * @param <T> Type information of the proxy
+   * @return Proxy to the RM
+   * @throws IOException on failure
+   */
+  @Public
+  @Unstable
+  @SuppressWarnings("unchecked")
+  public static <T> T createRMProxy(final Configuration configuration,
+      final Class<T> protocol, SubClusterId subClusterId,
+      UserGroupInformation user, final Token token) throws IOException {
+    try {
+      final YarnConfiguration conf = new YarnConfiguration(configuration);
+      updateConf(conf, subClusterId);
+      if (token != null) {
+        LOG.info(
+            "Creating RMProxy with a token: {} to subcluster: {}"
+                + " for protocol: {}",
+            token, subClusterId, protocol.getSimpleName());
+        user.addToken(token);
+        setAuthModeInConf(conf);
+      } else {
+        LOG.info("Creating RMProxy without a token to subcluster: {}"
+            + " for protocol: {}", subClusterId, protocol.getSimpleName());
+      }
+      final T proxyConnection = user.doAs(new PrivilegedExceptionAction<T>() {
+        @Override
+        public T run() throws Exception {
+          return ClientRMProxy.createRMProxy(conf, protocol);
+        }
+      });
+
+      return proxyConnection;
+    } catch (IOException e) {
+      String message =
+          "Error while creating of RM application master service proxy for"
+              + " appAttemptId: " + user;
+      LOG.info(message);
+      throw new YarnRuntimeException(message, e);
+    } catch (InterruptedException e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  private static void setAuthModeInConf(Configuration conf) {
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        SaslRpcServer.AuthMethod.TOKEN.toString());
+  }
+
+  // updating the conf with the refreshed RM addresses as proxy creations
+  // are based out of conf
+  private static void updateConf(Configuration conf,
+      SubClusterId subClusterId) {
+    conf.set(YarnConfiguration.FEDERATION_SUBCLUSTER_ID, subClusterId.getId());
+    // In a Federation setting, we will connect to not just the local cluster 
RM
+    // but also multiple external RMs. The membership information of all the 
RMs
+    // that are currently
+    // participating in Federation is available in the central
+    // FederationStateStore.
+    // So we will:
+    // 1. obtain the RM service addresses from FederationStateStore using the
+    // FederationRMFailoverProxyProvider.
+    // 2. disable traditional HA as that depends on local configuration lookup
+    // for RMs using indexes.
+    // 3. we will enable federation failover IF traditional HA is enabled so
+    // that the appropriate failover RetryPolicy is initialized.
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.setClass(YarnConfiguration.CLIENT_FAILOVER_PROXY_PROVIDER,
+        FederationRMFailoverProxyProvider.class, 
RMFailoverProxyProvider.class);
+    if (HAUtil.isHAEnabled(conf)) {
+      conf.setBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED, true);
+      conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false);
+    }
+  }
+
+  // disable instantiation
+  private FederationProxyProviderUtil() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
new file mode 100644
index 0000000..90a9239
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/FederationRMFailoverProxyProvider.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.failover;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.client.RMFailoverProxyProvider;
+import org.apache.hadoop.yarn.client.RMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A FailoverProxyProvider implementation that uses the
+ * {@code FederationStateStore} to determine the ResourceManager to connect to.
+ * This supports both HA and regular mode which is controlled by configuration.
+ */
+@Private
+@Unstable
+public class FederationRMFailoverProxyProvider<T>
+    implements RMFailoverProxyProvider<T> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationRMFailoverProxyProvider.class);
+
+  private RMProxy<T> rmProxy;
+  private Class<T> protocol;
+  private T current;
+  private YarnConfiguration conf;
+  private FederationStateStoreFacade facade;
+  private SubClusterId subClusterId;
+  private Collection<Token<? extends TokenIdentifier>> originalTokens;
+  private boolean federationFailoverEnabled = false;
+
+  @Override
+  public void init(Configuration configuration, RMProxy<T> proxy,
+      Class<T> proto) {
+    this.rmProxy = proxy;
+    this.protocol = proto;
+    this.rmProxy.checkAllowedProtocols(this.protocol);
+    String clusterId =
+        configuration.get(YarnConfiguration.FEDERATION_SUBCLUSTER_ID);
+    Preconditions.checkNotNull(clusterId, "Missing Federation SubClusterId");
+    this.subClusterId = SubClusterId.newInstance(clusterId);
+    this.facade = facade.getInstance();
+    if (configuration instanceof YarnConfiguration) {
+      this.conf = (YarnConfiguration) configuration;
+    }
+    federationFailoverEnabled =
+        conf.getBoolean(YarnConfiguration.FEDERATION_FAILOVER_ENABLED,
+            YarnConfiguration.DEFAULT_FEDERATION_FAILOVER_ENABLED);
+
+    conf.setInt(
+        CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+        conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES,
+            YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES));
+
+    conf.setInt(
+        
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+        conf.getInt(
+            YarnConfiguration.CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS,
+            
YarnConfiguration.DEFAULT_CLIENT_FAILOVER_RETRIES_ON_SOCKET_TIMEOUTS));
+
+    try {
+      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+      originalTokens = currentUser.getTokens();
+      LOG.info("Initialized Federation proxy for user: {}",
+          currentUser.getUserName());
+    } catch (IOException e) {
+      LOG.warn("Could not get information of requester, ignoring for now.");
+    }
+
+  }
+
+  private void addOriginalTokens(UserGroupInformation currentUser) {
+    if (originalTokens == null || originalTokens.isEmpty()) {
+      return;
+    }
+    for (Token<? extends TokenIdentifier> token : originalTokens) {
+      currentUser.addToken(token);
+    }
+  }
+
+  private T getProxyInternal(boolean isFailover) {
+    SubClusterInfo subClusterInfo;
+    UserGroupInformation currentUser = null;
+    try {
+      LOG.info("Failing over to the ResourceManager for SubClusterId: {}",
+          subClusterId);
+      subClusterInfo = facade.getSubCluster(subClusterId, isFailover);
+      // updating the conf with the refreshed RM addresses as proxy
+      // creations
+      // are based out of conf
+      updateRMAddress(subClusterInfo);
+      currentUser = UserGroupInformation.getCurrentUser();
+      addOriginalTokens(currentUser);
+    } catch (YarnException e) {
+      LOG.error("Exception while trying to create proxy to the ResourceManager"
+          + " for SubClusterId: {}", subClusterId, e);
+      return null;
+    } catch (IOException e) {
+      LOG.warn("Could not get information of requester, ignoring for now.");
+    }
+    try {
+      final InetSocketAddress rmAddress = rmProxy.getRMAddress(conf, protocol);
+      LOG.info("Connecting to {} with protocol {} as user: {}", rmAddress,
+          protocol.getSimpleName(), currentUser);
+      LOG.info("Failed over to the RM at {} for SubClusterId: {}", rmAddress,
+          subClusterId);
+      return RMProxy.getProxy(conf, protocol, rmAddress);
+    } catch (IOException ioe) {
+      LOG.error(
+          "IOException while trying to create proxy to the ResourceManager"
+              + " for SubClusterId: {}",
+          subClusterId, ioe);
+      return null;
+    }
+  }
+
+  private void updateRMAddress(SubClusterInfo subClusterInfo) {
+    if (subClusterInfo != null) {
+      if (protocol == ApplicationClientProtocol.class) {
+        conf.set(YarnConfiguration.RM_ADDRESS,
+            subClusterInfo.getClientRMServiceAddress());
+      } else if (protocol == ApplicationMasterProtocol.class) {
+        conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+            subClusterInfo.getAMRMServiceAddress());
+      } else if (protocol == ResourceManagerAdministrationProtocol.class) {
+        conf.set(YarnConfiguration.RM_ADMIN_ADDRESS,
+            subClusterInfo.getRMAdminServiceAddress());
+      }
+    }
+  }
+
+  @Override
+  public synchronized ProxyInfo<T> getProxy() {
+    if (current == null) {
+      current = getProxyInternal(false);
+    }
+    return new ProxyInfo<T>(current, subClusterId.getId());
+  }
+
+  @Override
+  public synchronized void performFailover(T currentProxy) {
+    closeInternal(currentProxy);
+    current = getProxyInternal(federationFailoverEnabled);
+  }
+
+  @Override
+  public Class<T> getInterface() {
+    return protocol;
+  }
+
+  private void closeInternal(T currentProxy) {
+    if ((currentProxy != null) && (currentProxy instanceof Closeable)) {
+      try {
+        ((Closeable) currentProxy).close();
+      } catch (IOException e) {
+        LOG.warn("Exception while trying to close proxy", e);
+      }
+    } else {
+      RPC.stopProxy(currentProxy);
+    }
+
+  }
+
+  /**
+   * Close all the proxy objects which have been opened over the lifetime of
+   * this proxy provider.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    closeInternal(current);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b5af206d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java
new file mode 100644
index 0000000..b1baa0c
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/failover/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.failover;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to