Repository: hadoop
Updated Branches:
  refs/heads/trunk 8c4b6d16a -> bbbf0e2a4


HADOOP-14741. Refactor curator based ZooKeeper communication into common 
library. (Íñigo Goiri via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bbbf0e2a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bbbf0e2a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bbbf0e2a

Branch: refs/heads/trunk
Commit: bbbf0e2a4136b30cad9dfd36ef138650a1adea60
Parents: 8c4b6d1
Author: Subru Krishnan <su...@apache.org>
Authored: Fri Aug 11 13:58:45 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Fri Aug 11 13:58:45 2017 -0700

----------------------------------------------------------------------
 .../hadoop/fs/CommonConfigurationKeys.java      |  21 ++
 .../hadoop/util/curator/ZKCuratorManager.java   | 294 +++++++++++++++++++
 .../hadoop/util/curator/package-info.java       |  27 ++
 .../src/main/resources/core-default.xml         |  46 +++
 .../util/curator/TestZKCuratorManager.java      |  95 ++++++
 .../hadoop/yarn/conf/YarnConfiguration.java     |  13 +-
 .../yarn/conf/TestYarnConfigurationFields.java  |   9 +
 .../src/main/resources/yarn-default.xml         |  53 ----
 ...ActiveStandbyElectorBasedElectorService.java |   5 +-
 .../yarn/server/resourcemanager/RMZKUtils.java  |  81 -----
 .../server/resourcemanager/ResourceManager.java |  83 +++---
 .../recovery/ZKRMStateStore.java                |  38 ++-
 .../server/resourcemanager/RMHATestBase.java    |   5 +-
 13 files changed, 567 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index e53f71e..0da4bbd 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -377,4 +377,25 @@ public class CommonConfigurationKeys extends 
CommonConfigurationKeysPublic {
 
   // HDFS client HTrace configuration.
   public static final String  FS_CLIENT_HTRACE_PREFIX = "fs.client.htrace.";
+
+  // Global ZooKeeper configuration keys
+  public static final String ZK_PREFIX = "hadoop.zk.";
+  /** ACL for the ZooKeeper ensemble. */
+  public static final String ZK_ACL = ZK_PREFIX + "acl";
+  public static final String ZK_ACL_DEFAULT = "world:anyone:rwcda";
+  /** Authentication for the ZooKeeper ensemble. */
+  public static final String ZK_AUTH = ZK_PREFIX + "auth";
+
+  /** Address of the ZooKeeper ensemble. */
+  public static final String ZK_ADDRESS = ZK_PREFIX + "address";
+  /** Maximum number of retries for a ZooKeeper operation. */
+  public static final String ZK_NUM_RETRIES = ZK_PREFIX + "num-retries";
+  public static final int    ZK_NUM_RETRIES_DEFAULT = 1000;
+  /** Timeout for a ZooKeeper operation in ZooKeeper in milliseconds. */
+  public static final String ZK_TIMEOUT_MS = ZK_PREFIX + "timeout-ms";
+  public static final int    ZK_TIMEOUT_MS_DEFAULT = 10000;
+  /** How often to retry a ZooKeeper operation  in milliseconds. */
+  public static final String ZK_RETRY_INTERVAL_MS =
+      ZK_PREFIX + "retry-interval-ms";
+  public static final int    ZK_RETRY_INTERVAL_MS_DEFAULT = 1000;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
new file mode 100644
index 0000000..3adf028
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.util.curator;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.curator.framework.AuthInfo;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class that provides utility methods specific to ZK operations.
+ */
+@InterfaceAudience.Private
+public final class ZKCuratorManager {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ZKCuratorManager.class);
+
+  /** Configuration for the ZooKeeper connection. */
+  private final Configuration conf;
+
+  /** Curator for ZooKeeper. */
+  private CuratorFramework curator;
+
+
+  public ZKCuratorManager(Configuration config) throws IOException {
+    this.conf = config;
+  }
+
+  /**
+   * Get the curator framework managing the ZooKeeper connection.
+   * @return Curator framework.
+   */
+  public CuratorFramework getCurator() {
+    return curator;
+  }
+
+  /**
+   * Close the connection with ZooKeeper.
+   */
+  public void close() {
+    if (curator != null) {
+      curator.close();
+    }
+  }
+
+  /**
+   * Utility method to fetch the ZK ACLs from the configuration.
+   * @throws java.io.IOException if the Zookeeper ACLs configuration file
+   * cannot be read
+   */
+  public static List<ACL> getZKAcls(Configuration conf) throws IOException {
+    // Parse authentication from configuration.
+    String zkAclConf = conf.get(CommonConfigurationKeys.ZK_ACL,
+        CommonConfigurationKeys.ZK_ACL_DEFAULT);
+    try {
+      zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
+      return ZKUtil.parseACLs(zkAclConf);
+    } catch (IOException | ZKUtil.BadAclFormatException e) {
+      LOG.error("Couldn't read ACLs based on {}",
+          CommonConfigurationKeys.ZK_ACL);
+      throw e;
+    }
+  }
+
+  /**
+   * Utility method to fetch ZK auth info from the configuration.
+   * @throws java.io.IOException if the Zookeeper ACLs configuration file
+   * cannot be read
+   */
+  public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
+      throws IOException {
+    String zkAuthConf = conf.get(CommonConfigurationKeys.ZK_AUTH);
+    try {
+      zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
+      if (zkAuthConf != null) {
+        return ZKUtil.parseAuth(zkAuthConf);
+      } else {
+        return Collections.emptyList();
+      }
+    } catch (IOException | ZKUtil.BadAuthFormatException e) {
+      LOG.error("Couldn't read Auth based on {}",
+          CommonConfigurationKeys.ZK_AUTH);
+      throw e;
+    }
+  }
+
+  /**
+   * Start the connection to the ZooKeeper ensemble.
+   * @param conf Configuration for the connection.
+   * @throws IOException If the connection cannot be started.
+   */
+  public void start() throws IOException {
+    this.start(new ArrayList<>());
+  }
+
+  /**
+   * Start the connection to the ZooKeeper ensemble.
+   * @param conf Configuration for the connection.
+   * @param authInfos List of authentication keys.
+   * @throws IOException If the connection cannot be started.
+   */
+  public void start(List<AuthInfo> authInfos) throws IOException {
+
+    // Connect to the ZooKeeper ensemble
+    String zkHostPort = conf.get(CommonConfigurationKeys.ZK_ADDRESS);
+    if (zkHostPort == null) {
+      throw new IOException(
+          CommonConfigurationKeys.ZK_ADDRESS + " is not configured.");
+    }
+    int numRetries = conf.getInt(CommonConfigurationKeys.ZK_NUM_RETRIES,
+        CommonConfigurationKeys.ZK_NUM_RETRIES_DEFAULT);
+    int zkSessionTimeout = conf.getInt(CommonConfigurationKeys.ZK_TIMEOUT_MS,
+        CommonConfigurationKeys.ZK_TIMEOUT_MS_DEFAULT);
+    int zkRetryInterval = conf.getInt(
+        CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS,
+        CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS_DEFAULT);
+    RetryNTimes retryPolicy = new RetryNTimes(numRetries, zkRetryInterval);
+
+    // Set up ZK auths
+    List<ZKUtil.ZKAuthInfo> zkAuths = getZKAuths(conf);
+    if (authInfos == null) {
+      authInfos = new ArrayList<>();
+    }
+    for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
+      authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+    }
+
+    CuratorFramework client = CuratorFrameworkFactory.builder()
+        .connectString(zkHostPort)
+        .sessionTimeoutMs(zkSessionTimeout)
+        .retryPolicy(retryPolicy)
+        .authorization(authInfos)
+        .build();
+    client.start();
+
+    this.curator = client;
+  }
+
+  /**
+   * Get ACLs for a ZNode.
+   * @param path Path of the ZNode.
+   * @return The list of ACLs.
+   * @throws Exception
+   */
+  public List<ACL> getACL(final String path) throws Exception {
+    return curator.getACL().forPath(path);
+  }
+
+  /**
+   * Get the data in a ZNode.
+   * @param path Path of the ZNode.
+   * @param stat Output statistics of the ZNode.
+   * @return The data in the ZNode.
+   * @throws Exception If it cannot contact Zookeeper.
+   */
+  public byte[] getData(final String path) throws Exception {
+    return curator.getData().forPath(path);
+  }
+
+  /**
+   * Get the data in a ZNode.
+   * @param path Path of the ZNode.
+   * @param stat Output statistics of the ZNode.
+   * @return The data in the ZNode.
+   * @throws Exception If it cannot contact Zookeeper.
+   */
+  public String getSringData(final String path) throws Exception {
+    byte[] bytes = getData(path);
+    return new String(bytes, Charset.forName("UTF-8"));
+  }
+
+  /**
+   * Set data into a ZNode.
+   * @param path Path of the ZNode.
+   * @param data Data to set.
+   * @param version Version of the data to store.
+   * @throws Exception If it cannot contact Zookeeper.
+   */
+  public void setData(String path, byte[] data, int version) throws Exception {
+    curator.setData().withVersion(version).forPath(path, data);
+  }
+
+  /**
+   * Set data into a ZNode.
+   * @param path Path of the ZNode.
+   * @param data Data to set as String.
+   * @param version Version of the data to store.
+   * @throws Exception If it cannot contact Zookeeper.
+   */
+  public void setData(String path, String data, int version) throws Exception {
+    byte[] bytes = data.getBytes(Charset.forName("UTF-8"));
+    setData(path, bytes, version);
+  }
+
+  /**
+   * Get children of a ZNode.
+   * @param path Path of the ZNode.
+   * @return The list of children.
+   * @throws Exception If it cannot contact Zookeeper.
+   */
+  public List<String> getChildren(final String path) throws Exception {
+    return curator.getChildren().forPath(path);
+  }
+
+  /**
+   * Check if a ZNode exists.
+   * @param path Path of the ZNode.
+   * @return If the ZNode exists.
+   * @throws Exception If it cannot contact Zookeeper.
+   */
+  public boolean exists(final String path) throws Exception {
+    return curator.checkExists().forPath(path) != null;
+  }
+
+  /**
+   * Create a ZNode.
+   * @param path Path of the ZNode.
+   * @return If the ZNode was created.
+   * @throws Exception If it cannot contact Zookeeper.
+   */
+  public boolean create(final String path) throws Exception {
+    return create(path, null);
+  }
+
+  /**
+   * Create a ZNode.
+   * @param path Path of the ZNode.
+   * @param zkAcl ACL for the node.
+   * @return If the ZNode was created.
+   * @throws Exception If it cannot contact Zookeeper.
+   */
+  public boolean create(final String path, List<ACL> zkAcl) throws Exception {
+    boolean created = false;
+    if (!exists(path)) {
+      curator.create()
+          .withMode(CreateMode.PERSISTENT)
+          .withACL(zkAcl)
+          .forPath(path, null);
+      created = true;
+    }
+    return created;
+  }
+
+  /**
+   * Delete a ZNode.
+   * @param path Path of the ZNode.
+   * @throws Exception If it cannot contact ZooKeeper.
+   */
+  public void delete(final String path) throws Exception {
+    if (exists(path)) {
+      curator.delete().deletingChildrenIfNeeded().forPath(path);
+    }
+  }
+
+  /**
+   * Get the path for a ZNode.
+   * @param root Root of the ZNode.
+   * @param nodeName Name of the ZNode.
+   * @return Path for the ZNode.
+   */
+  public static String getNodePath(String root, String nodeName) {
+    return root + "/" + nodeName;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/package-info.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/package-info.java
new file mode 100644
index 0000000..7b93f5a
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/package-info.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package provides utilities to interact with Curator ZooKeeper.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+package org.apache.hadoop.util.curator;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index e6b6919..ffcab2c 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2675,4 +2675,50 @@
       This determines the number of open file handles.
     </description>
   </property>
+
+  <property>
+    <description>Host:Port of the ZooKeeper server to be used.
+    </description>
+    <name>hadoop.zk.address</name>
+    <!--value>127.0.0.1:2181</value-->
+  </property>
+
+  <property>
+    <description>Number of tries to connect to ZooKeeper.</description>
+    <name>hadoop.zk.num-retries</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <description>Retry interval in milliseconds when connecting to ZooKeeper.
+    </description>
+    <name>hadoop.zk.retry-interval-ms</name>
+    <value>1000</value>
+  </property>
+
+  <property>
+    <description>ZooKeeper session timeout in milliseconds. Session expiration
+    is managed by the ZooKeeper cluster itself, not by the client. This value 
is
+    used by the cluster to determine when the client's session expires.
+    Expirations happens when the cluster does not hear from the client within
+    the specified session timeout period (i.e. no heartbeat).</description>
+    <name>hadoop.zk.timeout-ms</name>
+    <value>10000</value>
+  </property>
+
+  <property>
+    <description>ACL's to be used for ZooKeeper znodes.</description>
+    <name>hadoop.zk.acl</name>
+    <value>world:anyone:rwcda</value>
+  </property>
+
+  <property>
+    <description>
+        Specify the auths to be used for the ACL's specified in hadoop.zk.acl.
+        This takes a comma-separated list of authentication mechanisms, each 
of the
+        form 'scheme:auth' (the same syntax used for the 'addAuth' command in
+        the ZK CLI).
+    </description>
+    <name>hadoop.zk.auth</name>
+  </property>
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java
new file mode 100644
index 0000000..2bcf508
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestZKCuratorManager.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the manager for ZooKeeper Curator.
+ */
+public class TestZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+
+  @Before
+  public void setup() throws Exception {
+    this.server = new TestingServer();
+
+    Configuration conf = new Configuration();
+    conf.set(
+        CommonConfigurationKeys.ZK_ADDRESS, this.server.getConnectString());
+
+    this.curator = new ZKCuratorManager(conf);
+    this.curator.start();
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.curator.close();
+    if (this.server != null) {
+      this.server.close();
+      this.server = null;
+    }
+  }
+
+  @Test
+  public void testReadWriteData() throws Exception {
+    String testZNode = "/test";
+    String expectedString = "testString";
+    assertFalse(curator.exists(testZNode));
+    curator.create(testZNode);
+    assertTrue(curator.exists(testZNode));
+    curator.setData(testZNode, expectedString, -1);
+    String testString = curator.getSringData("/test");
+    assertEquals(expectedString, testString);
+  }
+
+  @Test
+  public void testChildren() throws Exception {
+    List<String> children = curator.getChildren("/");
+    assertEquals(1, children.size());
+
+    assertFalse(curator.exists("/node1"));
+    curator.create("/node1");
+    assertTrue(curator.exists("/node1"));
+
+    assertFalse(curator.exists("/node2"));
+    curator.create("/node2");
+    assertTrue(curator.exists("/node2"));
+
+    children = curator.getChildren("/");
+    assertEquals(3, children.size());
+
+    curator.delete("/node2");
+    assertFalse(curator.exists("/node2"));
+    children = curator.getChildren("/");
+    assertEquals(2, children.size());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 71a7134..cd4d569 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -29,6 +29,7 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ha.ActiveStandbyElector;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.net.NetUtils;
@@ -87,7 +88,17 @@ public class YarnConfiguration extends Configuration {
     });
     Configuration.addDeprecations(new DeprecationDelta[] {
         new DeprecationDelta(RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
-            SYSTEM_METRICS_PUBLISHER_ENABLED)
+            SYSTEM_METRICS_PUBLISHER_ENABLED),
+        new DeprecationDelta(RM_ZK_ACL, CommonConfigurationKeys.ZK_ACL),
+        new DeprecationDelta(RM_ZK_AUTH, CommonConfigurationKeys.ZK_AUTH),
+        new DeprecationDelta(RM_ZK_ADDRESS,
+            CommonConfigurationKeys.ZK_ADDRESS),
+        new DeprecationDelta(RM_ZK_NUM_RETRIES,
+            CommonConfigurationKeys.ZK_NUM_RETRIES),
+        new DeprecationDelta(RM_ZK_TIMEOUT_MS,
+            CommonConfigurationKeys.ZK_TIMEOUT_MS),
+        new DeprecationDelta(RM_ZK_RETRY_INTERVAL_MS,
+            CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS),
     });
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index a035dc7..b9ad31a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -121,6 +121,15 @@ public class TestYarnConfigurationFields extends 
TestConfigurationFieldsBase {
     configurationPropsToSkipCompare
         .add(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED);
 
+    // skip deprecated ZooKeeper settings
+    configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_ADDRESS);
+    configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_NUM_RETRIES);
+    configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_TIMEOUT_MS);
+    configurationPropsToSkipCompare.add(
+        YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS);
+    configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_AUTH);
+    configurationPropsToSkipCompare.add(YarnConfiguration.RM_ZK_ACL);
+
     // Used as Java command line properties, not XML
     configurationPrefixToSkipCompare.add("yarn.app.container");
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 000e892..dbf115b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -452,31 +452,6 @@
   </property>
 
   <property>
-    <description>Host:Port of the ZooKeeper server to be used by the RM. This
-      must be supplied when using the ZooKeeper based implementation of the
-      RM state store and/or embedded automatic failover in an HA setting.
-    </description>
-    <name>yarn.resourcemanager.zk-address</name>
-    <!--value>127.0.0.1:2181</value-->
-  </property>
-
-  <property>
-    <description>Number of times RM tries to connect to 
ZooKeeper.</description>
-    <name>yarn.resourcemanager.zk-num-retries</name>
-    <value>1000</value>
-  </property>
-
-  <property>
-    <description>Retry interval in milliseconds when connecting to ZooKeeper.
-      When HA is enabled, the value here is NOT used. It is generated
-      automatically from yarn.resourcemanager.zk-timeout-ms and
-      yarn.resourcemanager.zk-num-retries.
-    </description>
-    <name>yarn.resourcemanager.zk-retry-interval-ms</name>
-    <value>1000</value>
-  </property>
-
-  <property>
     <description>Full path of the ZooKeeper znode where RM state will be
     stored. This must be supplied when using
     org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
@@ -486,22 +461,6 @@
   </property>
 
   <property>
-    <description>ZooKeeper session timeout in milliseconds. Session expiration
-    is managed by the ZooKeeper cluster itself, not by the client. This value 
is
-    used by the cluster to determine when the client's session expires.
-    Expirations happens when the cluster does not hear from the client within
-    the specified session timeout period (i.e. no heartbeat).</description>
-    <name>yarn.resourcemanager.zk-timeout-ms</name>
-    <value>10000</value>
-  </property>
-
-  <property>
-    <description>ACL's to be used for ZooKeeper znodes.</description>
-    <name>yarn.resourcemanager.zk-acl</name>
-    <value>world:anyone:rwcda</value>
-  </property>
-
-  <property>
     <description>
       ACLs to be used for the root znode when using ZKRMStateStore in an HA
       scenario for fencing.
@@ -527,18 +486,6 @@
   </property>
 
   <property>
-    <description>
-        Specify the auths to be used for the ACL's specified in both the
-        yarn.resourcemanager.zk-acl and
-        yarn.resourcemanager.zk-state-store.root-node.acl properties.  This
-        takes a comma-separated list of authentication mechanisms, each of the
-        form 'scheme:auth' (the same syntax used for the 'addAuth' command in
-        the ZK CLI).
-    </description>
-    <name>yarn.resourcemanager.zk-auth</name>
-  </property>
-
-  <property>
     <description>URI pointing to the location of the FileSystem path where
     RM state will be stored. This must be supplied when using
     
org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
index a8dcda4..c5c9211 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
 import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -96,8 +97,8 @@ public class ActiveStandbyElectorBasedElectorService extends 
AbstractService
     zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
         YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
 
-    List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
-    List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
+    List<ACL> zkAcls = ZKCuratorManager.getZKAcls(conf);
+    List<ZKUtil.ZKAuthInfo> zkAuths = ZKCuratorManager.getZKAuths(conf);
 
     int maxRetryNum =
         conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
deleted file mode 100644
index 4b8561d..0000000
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMZKUtils.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ZKUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.zookeeper.data.ACL;
-
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Helper class that provides utility methods specific to ZK operations
- */
-@InterfaceAudience.Private
-public class RMZKUtils {
-  private static final Log LOG = LogFactory.getLog(RMZKUtils.class);
-
-  /**
-   * Utility method to fetch the ZK ACLs from the configuration.
-   *
-   * @throws java.io.IOException if the Zookeeper ACLs configuration file
-   * cannot be read
-   */
-  public static List<ACL> getZKAcls(Configuration conf) throws IOException {
-    // Parse authentication from configuration.
-    String zkAclConf =
-        conf.get(YarnConfiguration.RM_ZK_ACL,
-            YarnConfiguration.DEFAULT_RM_ZK_ACL);
-    try {
-      zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
-      return ZKUtil.parseACLs(zkAclConf);
-    } catch (IOException | ZKUtil.BadAclFormatException e) {
-      LOG.error("Couldn't read ACLs based on " + YarnConfiguration.RM_ZK_ACL);
-      throw e;
-    }
-  }
-
-  /**
-   * Utility method to fetch ZK auth info from the configuration.
-   *
-   * @throws java.io.IOException if the Zookeeper ACLs configuration file
-   * cannot be read
-   */
-  public static List<ZKUtil.ZKAuthInfo> getZKAuths(Configuration conf)
-      throws IOException {
-    String zkAuthConf = conf.get(YarnConfiguration.RM_ZK_AUTH);
-    try {
-      zkAuthConf = ZKUtil.resolveConfIndirection(zkAuthConf);
-      if (zkAuthConf != null) {
-        return ZKUtil.parseAuth(zkAuthConf);
-      } else {
-        return Collections.emptyList();
-      }
-    } catch (IOException | ZKUtil.BadAuthFormatException e) {
-      LOG.error("Couldn't read Auth based on " + YarnConfiguration.RM_ZK_AUTH);
-      throw e;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 9691885..cb7daf9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -22,8 +22,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.curator.framework.AuthInfo;
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
@@ -46,7 +44,7 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -192,7 +190,7 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
   protected ResourceTrackerService resourceTracker;
   private JvmMetrics jvmMetrics;
   private boolean curatorEnabled = false;
-  private CuratorFramework curator;
+  private ZKCuratorManager zkManager;
   private final String zkRootNodePassword =
       Long.toString(new SecureRandom().nextLong());
   private boolean recoveryEnabled;
@@ -345,7 +343,7 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
         conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
             YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
     if (curatorEnabled) {
-      this.curator = createAndStartCurator(conf);
+      this.zkManager = createAndStartZKManager(conf);
       elector = new CuratorBasedElectorService(this);
     } else {
       elector = new ActiveStandbyElectorBasedElectorService(this);
@@ -353,50 +351,49 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
     return elector;
   }
 
-  public CuratorFramework createAndStartCurator(Configuration conf)
+  /**
+   * Create and ZooKeeper Curator manager.
+   * @param config Configuration for the ZooKeeper curator.
+   * @return New ZooKeeper Curator manager.
+   * @throws IOException If it cannot create the manager.
+   */
+  public ZKCuratorManager createAndStartZKManager(Configuration config)
       throws IOException {
-    String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
-    if (zkHostPort == null) {
-      throw new YarnRuntimeException(
-          YarnConfiguration.RM_ZK_ADDRESS + " is not configured.");
-    }
-    int numRetries = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
-        YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
-    int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
-        YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
-    int zkRetryInterval = 
conf.getInt(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS,
-        YarnConfiguration.DEFAULT_RM_ZK_RETRY_INTERVAL_MS);
-
-    // set up zk auths
-    List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
+    ZKCuratorManager manager = new ZKCuratorManager(config);
+
+    // Get authentication
     List<AuthInfo> authInfos = new ArrayList<>();
-    for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
-      authInfos.add(new AuthInfo(zkAuth.getScheme(), zkAuth.getAuth()));
+    if (HAUtil.isHAEnabled(config) && HAUtil.getConfValueForRMInstance(
+        YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, config) == null) {
+      String zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
+          YarnConfiguration.RM_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_ADDRESS, config);
+      String defaultFencingAuth =
+          zkRootNodeUsername + ":" + zkRootNodePassword;
+      byte[] defaultFencingAuthData =
+          defaultFencingAuth.getBytes(Charset.forName("UTF-8"));
+      String scheme = new DigestAuthenticationProvider().getScheme();
+      AuthInfo authInfo = new AuthInfo(scheme, defaultFencingAuthData);
+      authInfos.add(authInfo);
     }
 
-    if (HAUtil.isHAEnabled(conf) && HAUtil.getConfValueForRMInstance(
-        YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf) == null) {
-      String zkRootNodeUsername = HAUtil
-          .getConfValueForRMInstance(YarnConfiguration.RM_ADDRESS,
-              YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
-      byte[] defaultFencingAuth =
-          (zkRootNodeUsername + ":" + zkRootNodePassword)
-              .getBytes(Charset.forName("UTF-8"));
-      authInfos.add(new AuthInfo(new 
DigestAuthenticationProvider().getScheme(),
-          defaultFencingAuth));
-    }
+    manager.start(authInfos);
+    return manager;
+  }
 
-    CuratorFramework client =  CuratorFrameworkFactory.builder()
-        .connectString(zkHostPort)
-        .sessionTimeoutMs(zkSessionTimeout)
-        .retryPolicy(new RetryNTimes(numRetries, zkRetryInterval))
-        .authorization(authInfos).build();
-    client.start();
-    return client;
+  /**
+   * Get the ZooKeeper Curator manager.
+   * @return ZooKeeper Curator manager.
+   */
+  public ZKCuratorManager getZKManager() {
+    return this.zkManager;
   }
 
   public CuratorFramework getCurator() {
-    return this.curator;
+    if (this.zkManager == null) {
+      return null;
+    }
+    return this.zkManager.getCurator();
   }
 
   public String getZkRootNodePassword() {
@@ -1264,8 +1261,8 @@ public class ResourceManager extends CompositeService 
implements Recoverable {
       configurationProvider.close();
     }
     super.serviceStop();
-    if (curator != null) {
-      curator.close();
+    if (zkManager != null) {
+      zkManager.close();
     }
     transitionToStandby(false);
     rmContext.setHAServiceState(HAServiceState.STOPPING);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
index 1b3b367..4b6e82c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
@@ -46,7 +47,6 @@ import 
org.apache.hadoop.yarn.proto.YarnProtos.ReservationAllocationStateProto;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import 
org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -201,8 +201,8 @@ public class ZKRMStateStore extends RMStateStore {
   private final String zkRootNodeAuthScheme =
       new DigestAuthenticationProvider().getScheme();
 
-  @VisibleForTesting
-  protected CuratorFramework curatorFramework;
+  /** Manager for the ZooKeeper connection. */
+  private ZKCuratorManager zkManager;
 
   /*
    * Indicates different app attempt state store operations.
@@ -298,12 +298,11 @@ public class ZKRMStateStore extends RMStateStore {
       appIdNodeSplitIndex = 
YarnConfiguration.DEFAULT_ZK_APPID_NODE_SPLIT_INDEX;
     }
 
-    zkAcl = RMZKUtils.getZKAcls(conf);
+    zkAcl = ZKCuratorManager.getZKAcls(conf);
 
     if (HAUtil.isHAEnabled(conf)) {
       String zkRootNodeAclConf = HAUtil.getConfValueForRMInstance
           (YarnConfiguration.ZK_RM_STATE_STORE_ROOT_NODE_ACL, conf);
-
       if (zkRootNodeAclConf != null) {
         zkRootNodeAclConf = ZKUtil.resolveConfIndirection(zkRootNodeAclConf);
 
@@ -330,10 +329,9 @@ public class ZKRMStateStore extends RMStateStore {
     amrmTokenSecretManagerRoot =
         getNodePath(zkRootNodePath, AMRMTOKEN_SECRET_MANAGER_ROOT);
     reservationRoot = getNodePath(zkRootNodePath, RESERVATION_SYSTEM_ROOT);
-    curatorFramework = resourceManager.getCurator();
-
-    if (curatorFramework == null) {
-      curatorFramework = resourceManager.createAndStartCurator(conf);
+    zkManager = resourceManager.getZKManager();
+    if (zkManager == null) {
+      zkManager = resourceManager.createAndStartZKManager(conf);
     }
   }
 
@@ -382,6 +380,7 @@ public class ZKRMStateStore extends RMStateStore {
       logRootNodeAcls("Before setting ACLs'\n");
     }
 
+    CuratorFramework curatorFramework = zkManager.getCurator();
     if (HAUtil.isHAEnabled(getConfig())) {
       curatorFramework.setACL().withACL(zkRootNodeAcl).forPath(zkRootNodePath);
     } else {
@@ -401,6 +400,7 @@ public class ZKRMStateStore extends RMStateStore {
     }
 
     if (!HAUtil.isHAEnabled(getConfig())) {
+      CuratorFramework curatorFramework = zkManager.getCurator();
       IOUtils.closeStream(curatorFramework);
     }
   }
@@ -936,6 +936,7 @@ public class ZKRMStateStore extends RMStateStore {
       }
       safeDelete(appIdRemovePath);
     } else {
+      CuratorFramework curatorFramework = zkManager.getCurator();
       curatorFramework.delete().deletingChildrenIfNeeded().
           forPath(appIdRemovePath);
     }
@@ -1236,38 +1237,32 @@ public class ZKRMStateStore extends RMStateStore {
 
   @VisibleForTesting
   byte[] getData(final String path) throws Exception {
-    return curatorFramework.getData().forPath(path);
+    return zkManager.getData(path);
   }
 
   @VisibleForTesting
   List<ACL> getACL(final String path) throws Exception {
-    return curatorFramework.getACL().forPath(path);
+    return zkManager.getACL(path);
   }
 
   @VisibleForTesting
   List<String> getChildren(final String path) throws Exception {
-    return curatorFramework.getChildren().forPath(path);
+    return zkManager.getChildren(path);
   }
 
   @VisibleForTesting
   boolean exists(final String path) throws Exception {
-    return curatorFramework.checkExists().forPath(path) != null;
+    return zkManager.exists(path);
   }
 
   @VisibleForTesting
   void create(final String path) throws Exception {
-    if (!exists(path)) {
-      curatorFramework.create()
-          .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
-          .forPath(path, null);
-    }
+    zkManager.create(path, zkAcl);
   }
 
   @VisibleForTesting
   void delete(final String path) throws Exception {
-    if (exists(path)) {
-      curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
-    }
+    zkManager.delete(path);
   }
 
   private void safeCreate(String path, byte[] data, List<ACL> acl,
@@ -1310,6 +1305,7 @@ public class ZKRMStateStore extends RMStateStore {
     private CuratorTransactionFinal transactionFinal;
 
     SafeTransaction() throws Exception {
+      CuratorFramework curatorFramework = zkManager.getCurator();
       CuratorTransaction transaction = curatorFramework.inTransaction();
       transactionFinal = transaction.create()
           .withMode(CreateMode.PERSISTENT).withACL(zkAcl)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbbf0e2a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
index c95bcdf..4d8b20d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -61,8 +62,8 @@ public abstract class RMHATestBase extends 
ClientBaseWithFixes{
     configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
     configuration.set(YarnConfiguration.RM_STORE,
         ZKRMStateStore.class.getName());
-    configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
-    configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
+    configuration.set(CommonConfigurationKeys.ZK_ADDRESS, hostPort);
+    configuration.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
     configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
     configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
     int base = 100;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to