Repository: hadoop
Updated Branches:
  refs/heads/trunk 6542d17ea -> 1606dad13


YARN-7497. Add file system based scheduler configuration store. Contributed by 
Jiandan Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1606dad1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1606dad1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1606dad1

Branch: refs/heads/trunk
Commit: 1606dad133de0dbee59175509a22994ec570ea41
Parents: 6542d17
Author: Weiwei Yang <w...@apache.org>
Authored: Fri Mar 30 21:41:33 2018 +0800
Committer: Weiwei Yang <w...@apache.org>
Committed: Fri Mar 30 21:41:33 2018 +0800

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  16 +-
 .../src/main/resources/yarn-default.xml         |  22 ++
 .../scheduler/capacity/CapacityScheduler.java   |   1 +
 .../conf/FSSchedulerConfigurationStore.java     | 303 +++++++++++++++++++
 .../conf/MutableCSConfigurationProvider.java    |   3 +
 .../capacity/conf/YarnConfigurationStore.java   |   2 +-
 .../conf/TestFSSchedulerConfigurationStore.java | 173 +++++++++++
 .../TestMutableCSConfigurationProvider.java     |  50 +++
 8 files changed, 568 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1f62bbd..42f2cae 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -776,12 +776,15 @@ public class YarnConfiguration extends Configuration {
   public static final String MEMORY_CONFIGURATION_STORE = "memory";
   @Private
   @Unstable
-  public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
+  public static final String FS_CONFIGURATION_STORE = "fs";
   @Private
   @Unstable
   public static final String ZK_CONFIGURATION_STORE = "zk";
   @Private
   @Unstable
+  public static final String LEVELDB_CONFIGURATION_STORE = "leveldb";
+  @Private
+  @Unstable
   public static final String DEFAULT_CONFIGURATION_STORE =
       FILE_CONFIGURATION_STORE;
   @Private
@@ -809,6 +812,17 @@ public class YarnConfiguration extends Configuration {
   @Private
   @Unstable
   public static final long DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS = 1000;
+  @Private
+  @Unstable
+  public static final String SCHEDULER_CONFIGURATION_FS_PATH =
+      YARN_PREFIX + "scheduler.configuration.fs.path";
+  @Private
+  @Unstable
+  public static final String SCHEDULER_CONFIGURATION_FS_MAX_VERSION =
+      YARN_PREFIX + "scheduler.configuration.max.version";
+  @Private
+  @Unstable
+  public static final int DEFAULT_SCHEDULER_CONFIGURATION_FS_MAX_VERSION = 100;
 
   /** Parent znode path under which ZKConfigurationStore will create znodes. */
   @Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 114ba4b..81b6658 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3570,6 +3570,28 @@
 
   <property>
     <description>
+      The file system directory to store the configuration files. The path
+      can be any format as long as it follows hadoop compatible schema,
+      for example value "file:///path/to/dir" means to store files on local
+      file system, value "hdfs:///path/to/dir" means to store files on HDFS.
+      If resource manager HA is enabled, recommended to use hdfs schema so
+      it works in fail-over scenario.
+    </description>
+    <name>yarn.scheduler.configuration.fs.path</name>
+    <value>file://${hadoop.tmp.dir}/yarn/system/schedconf</value>
+  </property>
+
+  <property>
+    <description>
+      The max number of configuration file in filesystem.
+      Default is 100 for either.
+    </description>
+    <name>yarn.scheduler.configuration.max.version</name>
+    <value>100</value>
+  </property>
+
+  <property>
+    <description>
       ZK root node path for configuration store when using zookeeper-based
       configuration store.
     </description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index bf674a8..e59bdde 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -326,6 +326,7 @@ public class CapacityScheduler extends
       case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
       case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
       case YarnConfiguration.ZK_CONFIGURATION_STORE:
+      case YarnConfiguration.FS_CONFIGURATION_STORE:
         this.csConfProvider = new MutableCSConfigurationProvider(rmContext);
         break;
       default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
new file mode 100644
index 0000000..2a24887
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.GsonBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+
+
+/**
+ * A filesystem implementation of {@link YarnConfigurationStore}. Offer
+ * configuration storage in FileSystem
+ */
+public class FSSchedulerConfigurationStore extends YarnConfigurationStore {
+  public static final Log LOG = LogFactory.getLog(
+      FSSchedulerConfigurationStore.class);
+
+  @VisibleForTesting
+  protected static final Version CURRENT_VERSION_INFO
+      = Version.newInstance(0, 1);
+
+  private static final String TMP = ".tmp";
+
+  private int maxVersion;
+  private Path schedulerConfDir;
+  private FileSystem fileSystem;
+  private LogMutation pendingMutation;
+  private PathFilter configFilePathFilter;
+  private volatile Configuration schedConf;
+  private volatile Configuration oldConf;
+  private Path tempConfigPath;
+
+  @Override
+  public void initialize(Configuration conf, Configuration vSchedConf,
+      RMContext rmContext) throws Exception {
+    this.configFilePathFilter = new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        if (path == null) {
+          return false;
+        }
+        String pathName = path.getName();
+        return pathName.startsWith(YarnConfiguration.CS_CONFIGURATION_FILE)
+            && !pathName.endsWith(TMP);
+      }
+    };
+
+    String schedulerConfPathStr = conf.get(
+        YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH);
+    if (schedulerConfPathStr == null || schedulerConfPathStr.isEmpty()) {
+      throw new IOException(
+          YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH
+              + " must be set");
+    }
+    this.schedulerConfDir = new Path(schedulerConfPathStr);
+    this.fileSystem = this.schedulerConfDir.getFileSystem(conf);
+    this.maxVersion = conf.getInt(
+        YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION,
+        YarnConfiguration.DEFAULT_SCHEDULER_CONFIGURATION_FS_MAX_VERSION);
+    LOG.info("schedulerConfDir=" + schedulerConfPathStr);
+    LOG.info("capacity scheduler file max version = " + maxVersion);
+
+    if (!fileSystem.exists(schedulerConfDir)) {
+      if (!fileSystem.mkdirs(schedulerConfDir)) {
+        throw new IOException("mkdir " + schedulerConfPathStr + " failed");
+      }
+    }
+
+    // create capacity-schedule.xml.ts file if not existing
+    if (this.getConfigFileInputStream() == null) {
+      writeConfigurationToFileSystem(vSchedConf);
+    }
+
+    this.schedConf = this.getConfigurationFromFileSystem();
+  }
+
+  /**
+   * Update and persist latest configuration in temp file.
+   * @param logMutation configuration change to be persisted in write ahead log
+   * @throws IOException throw IOE when write temp configuration file fail
+   */
+  @Override
+  public void logMutation(LogMutation logMutation) throws IOException {
+    pendingMutation = logMutation;
+    LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation));
+    oldConf = new Configuration(schedConf);
+    Map<String, String> mutations = pendingMutation.getUpdates();
+    for (Map.Entry<String, String> kv : mutations.entrySet()) {
+      if (kv.getValue() == null) {
+        this.schedConf.unset(kv.getKey());
+      } else {
+        this.schedConf.set(kv.getKey(), kv.getValue());
+      }
+    }
+    tempConfigPath = writeTmpConfig(schedConf);
+  }
+
+  /**
+   * @param isValid if true, finalize temp configuration file
+   *                if false, remove temp configuration file and rollback
+   * @throws Exception throw IOE when write temp configuration file fail
+   */
+  @Override
+  public void confirmMutation(boolean isValid) throws Exception {
+    if (pendingMutation == null || tempConfigPath == null) {
+      LOG.warn("pendingMutation or tempConfigPath is null, do nothing");
+      return;
+    }
+    if (isValid) {
+      finalizeFileSystemFile();
+    } else {
+      schedConf = oldConf;
+      removeTmpConfigFile();
+    }
+    tempConfigPath = null;
+  }
+
+  private void finalizeFileSystemFile() throws IOException {
+    // call confirmMutation() make sure tempConfigPath is not null
+    Path finalConfigPath = getFinalConfigPath(tempConfigPath);
+    fileSystem.rename(tempConfigPath, finalConfigPath);
+    LOG.info("finalize temp configuration file successfully, finalConfigPath="
+        + finalConfigPath);
+  }
+
+  private Path getFinalConfigPath(Path tempPath) {
+    String tempConfigPathStr = tempPath.getName();
+    if (!tempConfigPathStr.endsWith(TMP)) {
+      LOG.warn(tempPath + " does not end with '"
+          + TMP + "' return null");
+      return null;
+    }
+    String finalConfigPathStr = tempConfigPathStr.substring(0,
+        (tempConfigPathStr.length() - TMP.length()));
+    return new Path(tempPath.getParent(), finalConfigPathStr);
+  }
+
+  private void removeTmpConfigFile() throws IOException {
+    // call confirmMutation() make sure tempConfigPath is not null
+    fileSystem.delete(tempConfigPath, true);
+    LOG.info("delete temp configuration file: " + tempConfigPath);
+  }
+
+  private Configuration getConfigurationFromFileSystem() throws IOException {
+    long start = Time.monotonicNow();
+
+    Configuration conf = new Configuration(false);
+    InputStream configInputStream = getConfigFileInputStream();
+    if (configInputStream == null) {
+      throw new IOException(
+          "no capacity scheduler file in " + this.schedulerConfDir);
+    }
+
+    conf.addResource(configInputStream);
+    Configuration result = new Configuration(false);
+    for (Map.Entry<String, String> entry : conf) {
+      result.set(entry.getKey(), entry.getValue());
+    }
+    LOG.info("upload conf from fileSystem took "
+            + (Time.monotonicNow() - start) + " ms");
+
+    //for ha transition, local schedConf may be old one.
+    this.schedConf = result;
+    return result;
+  }
+
+  private InputStream getConfigFileInputStream() throws IOException {
+    Path lastestConfigPath = getLatestConfigPath();
+    if (lastestConfigPath == null) {
+      return null;
+    }
+    return fileSystem.open(lastestConfigPath);
+  }
+
+  private Path getLatestConfigPath() throws IOException {
+    FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir,
+        this.configFilePathFilter);
+
+    if (fileStatuses == null || fileStatuses.length == 0) {
+      return null;
+    }
+    Arrays.sort(fileStatuses);
+
+    return fileStatuses[fileStatuses.length - 1].getPath();
+  }
+
+  @VisibleForTesting
+  private Path writeTmpConfig(Configuration vSchedConf) throws IOException {
+    long start = Time.monotonicNow();
+    String tempSchedulerConfigFile = YarnConfiguration.CS_CONFIGURATION_FILE
+        + "." + System.currentTimeMillis() + TMP;
+
+    Path tempSchedulerConfigPath = new Path(this.schedulerConfDir,
+        tempSchedulerConfigFile);
+
+    try (FSDataOutputStream outputStream = fileSystem.create(
+        tempSchedulerConfigPath)) {
+      //clean configuration file when num exceed maxVersion
+      cleanConfigurationFile();
+
+      vSchedConf.writeXml(outputStream);
+      LOG.info(
+          "write temp capacity configuration successfully, 
schedulerConfigFile="
+              + tempSchedulerConfigPath);
+    } catch (IOException e) {
+      LOG.info("write temp capacity configuration fail, schedulerConfigFile="
+          + tempSchedulerConfigPath, e);
+      throw e;
+    }
+    LOG.info("write temp configuration to fileSystem took "
+        + (Time.monotonicNow() - start) + " ms");
+    return tempSchedulerConfigPath;
+  }
+
+  @VisibleForTesting
+  void writeConfigurationToFileSystem(Configuration vSchedConf)
+      throws IOException {
+    tempConfigPath = writeTmpConfig(vSchedConf);
+    finalizeFileSystemFile();
+  }
+
+  private void cleanConfigurationFile() throws IOException {
+    FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir,
+        this.configFilePathFilter);
+
+    if (fileStatuses == null || fileStatuses.length <= this.maxVersion) {
+      return;
+    }
+    Arrays.sort(fileStatuses);
+    int configFileNum = fileStatuses.length;
+    if (fileStatuses.length > this.maxVersion) {
+      for (int i = 0; i < configFileNum - this.maxVersion; i++) {
+        fileSystem.delete(fileStatuses[i].getPath(), false);
+        LOG.info("delete config file " + fileStatuses[i].getPath());
+      }
+    }
+  }
+
+  @Override
+  public Configuration retrieve() throws IOException {
+    return getConfigurationFromFileSystem();
+  }
+
+  @Override
+  public List<LogMutation> getConfirmedConfHistory(long fromId) {
+    // Unimplemented.
+    return null;
+  }
+
+  @Override
+  protected Version getConfStoreVersion() throws Exception {
+    return null;
+  }
+
+  @Override
+  protected void storeVersion() throws Exception {
+
+  }
+
+  @Override
+  protected Version getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+
+  public void close() throws IOException {
+    if (fileSystem != null) {
+      fileSystem.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
index 40a19a4..9c3bf9d 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java
@@ -77,6 +77,9 @@ public class MutableCSConfigurationProvider implements 
CSConfigurationProvider,
     case YarnConfiguration.ZK_CONFIGURATION_STORE:
       this.confStore = new ZKConfigurationStore();
       break;
+    case YarnConfiguration.FS_CONFIGURATION_STORE:
+      this.confStore = new FSSchedulerConfigurationStore();
+      break;
     default:
       this.confStore = YarnConfigurationStoreFactory.getStore(config);
       break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
index 7fb52fc..ef0a44b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java
@@ -123,7 +123,7 @@ public abstract class YarnConfigurationStore {
    * Retrieve the persisted configuration.
    * @return configuration as key-value
    */
-  public abstract Configuration retrieve();
+  public abstract Configuration retrieve() throws IOException;
 
   /**
    * Get a list of confirmed configuration mutations starting from a given id.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
new file mode 100644
index 0000000..65314be
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests {@link FSSchedulerConfigurationStore}.
+ */
+public class TestFSSchedulerConfigurationStore {
+  private FSSchedulerConfigurationStore configurationStore;
+  private Configuration conf;
+  private File testSchedulerConfigurationDir;
+
+  @Before
+  public void setUp() throws Exception {
+    configurationStore = new FSSchedulerConfigurationStore();
+    testSchedulerConfigurationDir = new File(
+        TestFSSchedulerConfigurationStore.class.getResource("").getPath()
+            + FSSchedulerConfigurationStore.class.getSimpleName());
+    testSchedulerConfigurationDir.mkdirs();
+
+    conf = new Configuration();
+    conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
+        testSchedulerConfigurationDir.getAbsolutePath());
+  }
+
+  private void writeConf(Configuration config) throws IOException {
+    FileSystem fileSystem = FileSystem.get(new Configuration(config));
+    String schedulerConfigurationFile = YarnConfiguration.CS_CONFIGURATION_FILE
+        + "." + System.currentTimeMillis();
+    FSDataOutputStream outputStream = fileSystem.create(
+        new Path(testSchedulerConfigurationDir.getAbsolutePath(),
+            schedulerConfigurationFile));
+    config.writeXml(outputStream);
+    outputStream.close();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FileUtils.deleteDirectory(testSchedulerConfigurationDir);
+  }
+
+  @Test
+  public void confirmMutationWithValid() throws Exception {
+    conf.setInt(
+        YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION, 2);
+    conf.set("a", "a");
+    conf.set("b", "b");
+    conf.set("c", "c");
+    writeConf(conf);
+    configurationStore.initialize(conf, conf, null);
+    Configuration storeConf = configurationStore.retrieve();
+    compareConfig(conf, storeConf);
+
+    Map<String, String> updates = new HashMap<>();
+    updates.put("a", null);
+    updates.put("b", "bb");
+
+    Configuration expectConfig = new Configuration(conf);
+    expectConfig.unset("a");
+    expectConfig.set("b", "bb");
+
+    LogMutation logMutation = new LogMutation(updates, "test");
+    configurationStore.logMutation(logMutation);
+    configurationStore.confirmMutation(true);
+    storeConf = configurationStore.retrieve();
+    assertEquals(null, storeConf.get("a"));
+    assertEquals("bb", storeConf.get("b"));
+    assertEquals("c", storeConf.get("c"));
+
+    compareConfig(expectConfig, storeConf);
+
+    updates.put("b", "bbb");
+    configurationStore.logMutation(logMutation);
+    configurationStore.confirmMutation(true);
+    storeConf = configurationStore.retrieve();
+    assertEquals(null, storeConf.get("a"));
+    assertEquals("bbb", storeConf.get("b"));
+    assertEquals("c", storeConf.get("c"));
+  }
+
+  @Test
+  public void confirmMutationWithInValid() throws Exception {
+    conf.set("a", "a");
+    conf.set("b", "b");
+    conf.set("c", "c");
+    writeConf(conf);
+    configurationStore.initialize(conf, conf, null);
+    Configuration storeConf = configurationStore.retrieve();
+    compareConfig(conf, storeConf);
+
+    Map<String, String> updates = new HashMap<>();
+    updates.put("a", null);
+    updates.put("b", "bb");
+
+    LogMutation logMutation = new LogMutation(updates, "test");
+    configurationStore.logMutation(logMutation);
+    configurationStore.confirmMutation(false);
+    storeConf = configurationStore.retrieve();
+
+    compareConfig(conf, storeConf);
+  }
+
+  @Test
+  public void retrieve() throws Exception {
+    Configuration schedulerConf = new Configuration();
+    schedulerConf.set("a", "a");
+    schedulerConf.setLong("long", 1L);
+    schedulerConf.setBoolean("boolean", true);
+    writeConf(schedulerConf);
+
+    configurationStore.initialize(conf, conf, null);
+    Configuration storedConfig = configurationStore.retrieve();
+
+    compareConfig(schedulerConf, storedConfig);
+  }
+
+  @Test
+  public void checkVersion() {
+    try {
+      configurationStore.checkVersion();
+    } catch (Exception e) {
+      fail("checkVersion throw exception");
+    }
+  }
+
+  private void compareConfig(Configuration schedulerConf,
+      Configuration storedConfig) {
+    for (Map.Entry<String, String> entry : schedulerConf) {
+      assertEquals(entry.getKey(), schedulerConf.get(entry.getKey()),
+          storedConfig.get(entry.getKey()));
+    }
+
+    for (Map.Entry<String, String> entry : storedConfig) {
+      assertEquals(entry.getKey(), storedConfig.get(entry.getKey()),
+          schedulerConf.get(entry.getKey()));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
index 5d43ebb..81bc7a7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java
@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
@@ -30,6 +34,8 @@ import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -99,4 +105,48 @@ public class TestMutableCSConfigurationProvider {
     assertNull(confProvider.loadConfiguration(conf).get(
         "yarn.scheduler.capacity.root.a.badKey"));
   }
+
+  @Test
+  public void testHDFSBackedProvider() throws Exception {
+    File testSchedulerConfigurationDir = new File(
+        TestMutableCSConfigurationProvider.class.getResource("").getPath()
+            + TestMutableCSConfigurationProvider.class.getSimpleName());
+    FileUtils.deleteDirectory(testSchedulerConfigurationDir);
+    testSchedulerConfigurationDir.mkdirs();
+
+    Configuration conf = new Configuration(false);
+    conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
+        YarnConfiguration.FS_CONFIGURATION_STORE);
+    conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH,
+        testSchedulerConfigurationDir.getAbsolutePath());
+    writeConf(conf, testSchedulerConfigurationDir.getAbsolutePath());
+
+    confProvider.init(conf);
+    assertNull(confProvider.loadConfiguration(conf)
+        .get("yarn.scheduler.capacity.root.a.goodKey"));
+
+    confProvider.logAndApplyMutation(TEST_USER, goodUpdate);
+    confProvider.confirmPendingMutation(true);
+    assertEquals("goodVal", confProvider.loadConfiguration(conf)
+        .get("yarn.scheduler.capacity.root.a.goodKey"));
+
+    assertNull(confProvider.loadConfiguration(conf).get(
+        "yarn.scheduler.capacity.root.a.badKey"));
+    confProvider.logAndApplyMutation(TEST_USER, badUpdate);
+    confProvider.confirmPendingMutation(false);
+    assertNull(confProvider.loadConfiguration(conf).get(
+        "yarn.scheduler.capacity.root.a.badKey"));
+
+  }
+
+  private void writeConf(Configuration conf, String storePath)
+      throws IOException {
+    FileSystem fileSystem = FileSystem.get(new Configuration(conf));
+    String schedulerConfigurationFile = YarnConfiguration.CS_CONFIGURATION_FILE
+        + "." + System.currentTimeMillis();
+    try (FSDataOutputStream outputStream = fileSystem.create(
+        new Path(storePath, schedulerConfigurationFile))) {
+      conf.writeXml(outputStream);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to