Repository: hadoop Updated Branches: refs/heads/trunk 6542d17ea -> 1606dad13
YARN-7497. Add file system based scheduler configuration store. Contributed by Jiandan Yang. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1606dad1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1606dad1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1606dad1 Branch: refs/heads/trunk Commit: 1606dad133de0dbee59175509a22994ec570ea41 Parents: 6542d17 Author: Weiwei Yang <w...@apache.org> Authored: Fri Mar 30 21:41:33 2018 +0800 Committer: Weiwei Yang <w...@apache.org> Committed: Fri Mar 30 21:41:33 2018 +0800 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 16 +- .../src/main/resources/yarn-default.xml | 22 ++ .../scheduler/capacity/CapacityScheduler.java | 1 + .../conf/FSSchedulerConfigurationStore.java | 303 +++++++++++++++++++ .../conf/MutableCSConfigurationProvider.java | 3 + .../capacity/conf/YarnConfigurationStore.java | 2 +- .../conf/TestFSSchedulerConfigurationStore.java | 173 +++++++++++ .../TestMutableCSConfigurationProvider.java | 50 +++ 8 files changed, 568 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 1f62bbd..42f2cae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -776,12 +776,15 @@ public class YarnConfiguration extends Configuration { public static final String MEMORY_CONFIGURATION_STORE = "memory"; @Private @Unstable - public static final String LEVELDB_CONFIGURATION_STORE = "leveldb"; + public static final String FS_CONFIGURATION_STORE = "fs"; @Private @Unstable public static final String ZK_CONFIGURATION_STORE = "zk"; @Private @Unstable + public static final String LEVELDB_CONFIGURATION_STORE = "leveldb"; + @Private + @Unstable public static final String DEFAULT_CONFIGURATION_STORE = FILE_CONFIGURATION_STORE; @Private @@ -809,6 +812,17 @@ public class YarnConfiguration extends Configuration { @Private @Unstable public static final long DEFAULT_RM_SCHEDCONF_ZK_MAX_LOGS = 1000; + @Private + @Unstable + public static final String SCHEDULER_CONFIGURATION_FS_PATH = + YARN_PREFIX + "scheduler.configuration.fs.path"; + @Private + @Unstable + public static final String SCHEDULER_CONFIGURATION_FS_MAX_VERSION = + YARN_PREFIX + "scheduler.configuration.max.version"; + @Private + @Unstable + public static final int DEFAULT_SCHEDULER_CONFIGURATION_FS_MAX_VERSION = 100; /** Parent znode path under which ZKConfigurationStore will create znodes. */ @Private http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 114ba4b..81b6658 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3570,6 +3570,28 @@ <property> <description> + The file system directory to store the configuration files. The path + can be any format as long as it follows hadoop compatible schema, + for example value "file:///path/to/dir" means to store files on local + file system, value "hdfs:///path/to/dir" means to store files on HDFS. + If resource manager HA is enabled, recommended to use hdfs schema so + it works in fail-over scenario. + </description> + <name>yarn.scheduler.configuration.fs.path</name> + <value>file://${hadoop.tmp.dir}/yarn/system/schedconf</value> + </property> + + <property> + <description> + The max number of configuration file in filesystem. + Default is 100 for either. + </description> + <name>yarn.scheduler.configuration.max.version</name> + <value>100</value> + </property> + + <property> + <description> ZK root node path for configuration store when using zookeeper-based configuration store. </description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index bf674a8..e59bdde 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -326,6 +326,7 @@ public class CapacityScheduler extends case YarnConfiguration.MEMORY_CONFIGURATION_STORE: case YarnConfiguration.LEVELDB_CONFIGURATION_STORE: case YarnConfiguration.ZK_CONFIGURATION_STORE: + case YarnConfiguration.FS_CONFIGURATION_STORE: this.csConfProvider = new MutableCSConfigurationProvider(rmContext); break; default: http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java new file mode 100644 index 0000000..2a24887 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/FSSchedulerConfigurationStore.java @@ -0,0 +1,303 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import com.google.common.annotations.VisibleForTesting; +import com.google.gson.GsonBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.records.Version; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; + + +/** + * A filesystem implementation of {@link YarnConfigurationStore}. Offer + * configuration storage in FileSystem + */ +public class FSSchedulerConfigurationStore extends YarnConfigurationStore { + public static final Log LOG = LogFactory.getLog( + FSSchedulerConfigurationStore.class); + + @VisibleForTesting + protected static final Version CURRENT_VERSION_INFO + = Version.newInstance(0, 1); + + private static final String TMP = ".tmp"; + + private int maxVersion; + private Path schedulerConfDir; + private FileSystem fileSystem; + private LogMutation pendingMutation; + private PathFilter configFilePathFilter; + private volatile Configuration schedConf; + private volatile Configuration oldConf; + private Path tempConfigPath; + + @Override + public void initialize(Configuration conf, Configuration vSchedConf, + RMContext rmContext) throws Exception { + this.configFilePathFilter = new PathFilter() { + @Override + public boolean accept(Path path) { + if (path == null) { + return false; + } + String pathName = path.getName(); + return pathName.startsWith(YarnConfiguration.CS_CONFIGURATION_FILE) + && !pathName.endsWith(TMP); + } + }; + + String schedulerConfPathStr = conf.get( + YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH); + if (schedulerConfPathStr == null || schedulerConfPathStr.isEmpty()) { + throw new IOException( + YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH + + " must be set"); + } + this.schedulerConfDir = new Path(schedulerConfPathStr); + this.fileSystem = this.schedulerConfDir.getFileSystem(conf); + this.maxVersion = conf.getInt( + YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION, + YarnConfiguration.DEFAULT_SCHEDULER_CONFIGURATION_FS_MAX_VERSION); + LOG.info("schedulerConfDir=" + schedulerConfPathStr); + LOG.info("capacity scheduler file max version = " + maxVersion); + + if (!fileSystem.exists(schedulerConfDir)) { + if (!fileSystem.mkdirs(schedulerConfDir)) { + throw new IOException("mkdir " + schedulerConfPathStr + " failed"); + } + } + + // create capacity-schedule.xml.ts file if not existing + if (this.getConfigFileInputStream() == null) { + writeConfigurationToFileSystem(vSchedConf); + } + + this.schedConf = this.getConfigurationFromFileSystem(); + } + + /** + * Update and persist latest configuration in temp file. + * @param logMutation configuration change to be persisted in write ahead log + * @throws IOException throw IOE when write temp configuration file fail + */ + @Override + public void logMutation(LogMutation logMutation) throws IOException { + pendingMutation = logMutation; + LOG.info(new GsonBuilder().serializeNulls().create().toJson(logMutation)); + oldConf = new Configuration(schedConf); + Map<String, String> mutations = pendingMutation.getUpdates(); + for (Map.Entry<String, String> kv : mutations.entrySet()) { + if (kv.getValue() == null) { + this.schedConf.unset(kv.getKey()); + } else { + this.schedConf.set(kv.getKey(), kv.getValue()); + } + } + tempConfigPath = writeTmpConfig(schedConf); + } + + /** + * @param isValid if true, finalize temp configuration file + * if false, remove temp configuration file and rollback + * @throws Exception throw IOE when write temp configuration file fail + */ + @Override + public void confirmMutation(boolean isValid) throws Exception { + if (pendingMutation == null || tempConfigPath == null) { + LOG.warn("pendingMutation or tempConfigPath is null, do nothing"); + return; + } + if (isValid) { + finalizeFileSystemFile(); + } else { + schedConf = oldConf; + removeTmpConfigFile(); + } + tempConfigPath = null; + } + + private void finalizeFileSystemFile() throws IOException { + // call confirmMutation() make sure tempConfigPath is not null + Path finalConfigPath = getFinalConfigPath(tempConfigPath); + fileSystem.rename(tempConfigPath, finalConfigPath); + LOG.info("finalize temp configuration file successfully, finalConfigPath=" + + finalConfigPath); + } + + private Path getFinalConfigPath(Path tempPath) { + String tempConfigPathStr = tempPath.getName(); + if (!tempConfigPathStr.endsWith(TMP)) { + LOG.warn(tempPath + " does not end with '" + + TMP + "' return null"); + return null; + } + String finalConfigPathStr = tempConfigPathStr.substring(0, + (tempConfigPathStr.length() - TMP.length())); + return new Path(tempPath.getParent(), finalConfigPathStr); + } + + private void removeTmpConfigFile() throws IOException { + // call confirmMutation() make sure tempConfigPath is not null + fileSystem.delete(tempConfigPath, true); + LOG.info("delete temp configuration file: " + tempConfigPath); + } + + private Configuration getConfigurationFromFileSystem() throws IOException { + long start = Time.monotonicNow(); + + Configuration conf = new Configuration(false); + InputStream configInputStream = getConfigFileInputStream(); + if (configInputStream == null) { + throw new IOException( + "no capacity scheduler file in " + this.schedulerConfDir); + } + + conf.addResource(configInputStream); + Configuration result = new Configuration(false); + for (Map.Entry<String, String> entry : conf) { + result.set(entry.getKey(), entry.getValue()); + } + LOG.info("upload conf from fileSystem took " + + (Time.monotonicNow() - start) + " ms"); + + //for ha transition, local schedConf may be old one. + this.schedConf = result; + return result; + } + + private InputStream getConfigFileInputStream() throws IOException { + Path lastestConfigPath = getLatestConfigPath(); + if (lastestConfigPath == null) { + return null; + } + return fileSystem.open(lastestConfigPath); + } + + private Path getLatestConfigPath() throws IOException { + FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir, + this.configFilePathFilter); + + if (fileStatuses == null || fileStatuses.length == 0) { + return null; + } + Arrays.sort(fileStatuses); + + return fileStatuses[fileStatuses.length - 1].getPath(); + } + + @VisibleForTesting + private Path writeTmpConfig(Configuration vSchedConf) throws IOException { + long start = Time.monotonicNow(); + String tempSchedulerConfigFile = YarnConfiguration.CS_CONFIGURATION_FILE + + "." + System.currentTimeMillis() + TMP; + + Path tempSchedulerConfigPath = new Path(this.schedulerConfDir, + tempSchedulerConfigFile); + + try (FSDataOutputStream outputStream = fileSystem.create( + tempSchedulerConfigPath)) { + //clean configuration file when num exceed maxVersion + cleanConfigurationFile(); + + vSchedConf.writeXml(outputStream); + LOG.info( + "write temp capacity configuration successfully, schedulerConfigFile=" + + tempSchedulerConfigPath); + } catch (IOException e) { + LOG.info("write temp capacity configuration fail, schedulerConfigFile=" + + tempSchedulerConfigPath, e); + throw e; + } + LOG.info("write temp configuration to fileSystem took " + + (Time.monotonicNow() - start) + " ms"); + return tempSchedulerConfigPath; + } + + @VisibleForTesting + void writeConfigurationToFileSystem(Configuration vSchedConf) + throws IOException { + tempConfigPath = writeTmpConfig(vSchedConf); + finalizeFileSystemFile(); + } + + private void cleanConfigurationFile() throws IOException { + FileStatus[] fileStatuses = fileSystem.listStatus(this.schedulerConfDir, + this.configFilePathFilter); + + if (fileStatuses == null || fileStatuses.length <= this.maxVersion) { + return; + } + Arrays.sort(fileStatuses); + int configFileNum = fileStatuses.length; + if (fileStatuses.length > this.maxVersion) { + for (int i = 0; i < configFileNum - this.maxVersion; i++) { + fileSystem.delete(fileStatuses[i].getPath(), false); + LOG.info("delete config file " + fileStatuses[i].getPath()); + } + } + } + + @Override + public Configuration retrieve() throws IOException { + return getConfigurationFromFileSystem(); + } + + @Override + public List<LogMutation> getConfirmedConfHistory(long fromId) { + // Unimplemented. + return null; + } + + @Override + protected Version getConfStoreVersion() throws Exception { + return null; + } + + @Override + protected void storeVersion() throws Exception { + + } + + @Override + protected Version getCurrentVersion() { + return CURRENT_VERSION_INFO; + } + + public void close() throws IOException { + if (fileSystem != null) { + fileSystem.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index 40a19a4..9c3bf9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -77,6 +77,9 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, case YarnConfiguration.ZK_CONFIGURATION_STORE: this.confStore = new ZKConfigurationStore(); break; + case YarnConfiguration.FS_CONFIGURATION_STORE: + this.confStore = new FSSchedulerConfigurationStore(); + break; default: this.confStore = YarnConfigurationStoreFactory.getStore(config); break; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java index 7fb52fc..ef0a44b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/YarnConfigurationStore.java @@ -123,7 +123,7 @@ public abstract class YarnConfigurationStore { * Retrieve the persisted configuration. * @return configuration as key-value */ - public abstract Configuration retrieve(); + public abstract Configuration retrieve() throws IOException; /** * Get a list of confirmed configuration mutations starting from a given id. http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java new file mode 100644 index 0000000..65314be --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestFSSchedulerConfigurationStore.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Tests {@link FSSchedulerConfigurationStore}. + */ +public class TestFSSchedulerConfigurationStore { + private FSSchedulerConfigurationStore configurationStore; + private Configuration conf; + private File testSchedulerConfigurationDir; + + @Before + public void setUp() throws Exception { + configurationStore = new FSSchedulerConfigurationStore(); + testSchedulerConfigurationDir = new File( + TestFSSchedulerConfigurationStore.class.getResource("").getPath() + + FSSchedulerConfigurationStore.class.getSimpleName()); + testSchedulerConfigurationDir.mkdirs(); + + conf = new Configuration(); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH, + testSchedulerConfigurationDir.getAbsolutePath()); + } + + private void writeConf(Configuration config) throws IOException { + FileSystem fileSystem = FileSystem.get(new Configuration(config)); + String schedulerConfigurationFile = YarnConfiguration.CS_CONFIGURATION_FILE + + "." + System.currentTimeMillis(); + FSDataOutputStream outputStream = fileSystem.create( + new Path(testSchedulerConfigurationDir.getAbsolutePath(), + schedulerConfigurationFile)); + config.writeXml(outputStream); + outputStream.close(); + } + + @After + public void tearDown() throws Exception { + FileUtils.deleteDirectory(testSchedulerConfigurationDir); + } + + @Test + public void confirmMutationWithValid() throws Exception { + conf.setInt( + YarnConfiguration.SCHEDULER_CONFIGURATION_FS_MAX_VERSION, 2); + conf.set("a", "a"); + conf.set("b", "b"); + conf.set("c", "c"); + writeConf(conf); + configurationStore.initialize(conf, conf, null); + Configuration storeConf = configurationStore.retrieve(); + compareConfig(conf, storeConf); + + Map<String, String> updates = new HashMap<>(); + updates.put("a", null); + updates.put("b", "bb"); + + Configuration expectConfig = new Configuration(conf); + expectConfig.unset("a"); + expectConfig.set("b", "bb"); + + LogMutation logMutation = new LogMutation(updates, "test"); + configurationStore.logMutation(logMutation); + configurationStore.confirmMutation(true); + storeConf = configurationStore.retrieve(); + assertEquals(null, storeConf.get("a")); + assertEquals("bb", storeConf.get("b")); + assertEquals("c", storeConf.get("c")); + + compareConfig(expectConfig, storeConf); + + updates.put("b", "bbb"); + configurationStore.logMutation(logMutation); + configurationStore.confirmMutation(true); + storeConf = configurationStore.retrieve(); + assertEquals(null, storeConf.get("a")); + assertEquals("bbb", storeConf.get("b")); + assertEquals("c", storeConf.get("c")); + } + + @Test + public void confirmMutationWithInValid() throws Exception { + conf.set("a", "a"); + conf.set("b", "b"); + conf.set("c", "c"); + writeConf(conf); + configurationStore.initialize(conf, conf, null); + Configuration storeConf = configurationStore.retrieve(); + compareConfig(conf, storeConf); + + Map<String, String> updates = new HashMap<>(); + updates.put("a", null); + updates.put("b", "bb"); + + LogMutation logMutation = new LogMutation(updates, "test"); + configurationStore.logMutation(logMutation); + configurationStore.confirmMutation(false); + storeConf = configurationStore.retrieve(); + + compareConfig(conf, storeConf); + } + + @Test + public void retrieve() throws Exception { + Configuration schedulerConf = new Configuration(); + schedulerConf.set("a", "a"); + schedulerConf.setLong("long", 1L); + schedulerConf.setBoolean("boolean", true); + writeConf(schedulerConf); + + configurationStore.initialize(conf, conf, null); + Configuration storedConfig = configurationStore.retrieve(); + + compareConfig(schedulerConf, storedConfig); + } + + @Test + public void checkVersion() { + try { + configurationStore.checkVersion(); + } catch (Exception e) { + fail("checkVersion throw exception"); + } + } + + private void compareConfig(Configuration schedulerConf, + Configuration storedConfig) { + for (Map.Entry<String, String> entry : schedulerConf) { + assertEquals(entry.getKey(), schedulerConf.get(entry.getKey()), + storedConfig.get(entry.getKey())); + } + + for (Map.Entry<String, String> entry : storedConfig) { + assertEquals(entry.getKey(), storedConfig.get(entry.getKey()), + schedulerConf.get(entry.getKey())); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/1606dad1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index 5d43ebb..81bc7a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -18,7 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.AdminService; @@ -30,6 +34,8 @@ import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -99,4 +105,48 @@ public class TestMutableCSConfigurationProvider { assertNull(confProvider.loadConfiguration(conf).get( "yarn.scheduler.capacity.root.a.badKey")); } + + @Test + public void testHDFSBackedProvider() throws Exception { + File testSchedulerConfigurationDir = new File( + TestMutableCSConfigurationProvider.class.getResource("").getPath() + + TestMutableCSConfigurationProvider.class.getSimpleName()); + FileUtils.deleteDirectory(testSchedulerConfigurationDir); + testSchedulerConfigurationDir.mkdirs(); + + Configuration conf = new Configuration(false); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS, + YarnConfiguration.FS_CONFIGURATION_STORE); + conf.set(YarnConfiguration.SCHEDULER_CONFIGURATION_FS_PATH, + testSchedulerConfigurationDir.getAbsolutePath()); + writeConf(conf, testSchedulerConfigurationDir.getAbsolutePath()); + + confProvider.init(conf); + assertNull(confProvider.loadConfiguration(conf) + .get("yarn.scheduler.capacity.root.a.goodKey")); + + confProvider.logAndApplyMutation(TEST_USER, goodUpdate); + confProvider.confirmPendingMutation(true); + assertEquals("goodVal", confProvider.loadConfiguration(conf) + .get("yarn.scheduler.capacity.root.a.goodKey")); + + assertNull(confProvider.loadConfiguration(conf).get( + "yarn.scheduler.capacity.root.a.badKey")); + confProvider.logAndApplyMutation(TEST_USER, badUpdate); + confProvider.confirmPendingMutation(false); + assertNull(confProvider.loadConfiguration(conf).get( + "yarn.scheduler.capacity.root.a.badKey")); + + } + + private void writeConf(Configuration conf, String storePath) + throws IOException { + FileSystem fileSystem = FileSystem.get(new Configuration(conf)); + String schedulerConfigurationFile = YarnConfiguration.CS_CONFIGURATION_FILE + + "." + System.currentTimeMillis(); + try (FSDataOutputStream outputStream = fileSystem.create( + new Path(storePath, schedulerConfigurationFile))) { + conf.writeXml(outputStream); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org