Author: ddas
Date: Mon Jul 7 03:41:50 2008
New Revision: 674442
URL: http://svn.apache.org/viewvc?rev=674442&view=rev
Log:
HADOOP-3479. Defines the configuration file for the resource manager in Hadoop.
You can configure various parameters related to scheduling, such as queues and
queue properties here. The properties for a queue follow a naming
convention,such as, hadoop.rm.queue.queue-name.property-name. Contributed by
Hemanth Yamijala.
Added:
hadoop/core/trunk/conf/resource-manager-conf.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceManagerConf.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceManagerConf.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=674442&r1=674441&r2=674442&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jul 7 03:41:50 2008
@@ -37,6 +37,12 @@
stream.reduce.output.field.separator
All of them default to "\t". (Zheng Shao via omalley)
+ HADOOP-3479. Defines the configuration file for the resource manager in
+ Hadoop. You can configure various parameters related to scheduling, such
+ as queues and queue properties here. The properties for a queue follow a
+ naming convention,such as, hadoop.rm.queue.queue-name.property-name.
+ (Hemanth Yamijala via ddas)
+
IMPROVEMENTS
HADOOP-3577. Tools to inject blocks into name node and simulated
Added: hadoop/core/trunk/conf/resource-manager-conf.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/conf/resource-manager-conf.xml?rev=674442&view=auto
==============================================================================
--- hadoop/core/trunk/conf/resource-manager-conf.xml (added)
+++ hadoop/core/trunk/conf/resource-manager-conf.xml Mon Jul 7 03:41:50 2008
@@ -0,0 +1,86 @@
+<?xml version="1.0"?>
+
+<!-- This is the configuration file for the resource manager in Hadoop. -->
+<!-- You can configure various parameters related to scheduling, such as -->
+<!-- queues and queue properties here. -->
+<!-- The properties for a queue follow a naming convention,such as, -->
+<!-- hadoop.rm.queue.queue-name.property-name. -->
+
+<configuration>
+
+ <property>
+ <name>hadoop.rm.queue.names</name>
+ <value>default</value>
+ <description>Comma separated list of queue names that are configured
+ for this installation. Properties for a queue in this list will be
+ listed as hadoop.rm.queue.queue-name.property-name.</description>
+ </property>
+
+ <property>
+ <name>hadoop.rm.queue.default.guaranteed-capacity-maps</name>
+ <value></value>
+ <description>Guaranteed number of maps that will be available
+ for jobs in this queue.
+ </description>
+ </property>
+
+ <property>
+ <name>hadoop.rm.queue.default.guaranteed-capacity-reduces</name>
+ <value></value>
+ <description>Guaranteed number of reduces that will be available
+ for jobs in this queue.
+ </description>
+ </property>
+
+ <property>
+ <name>hadoop.rm.queue.default.reclaim-time-limit</name>
+ <value>0</value>
+ <description>The amount of time in seconds before which
+ resources distributed to other queues will be reclaimed.
+ </description>
+ </property>
+
+ <property>
+ <name>hadoop.rm.queue.default.allowed-users</name>
+ <value></value>
+ <description>Comma separated list of users who can submit jobs
+ to this queue. If empty, it means there are no ACLs
+ configured.
+ </description>
+ </property>
+
+ <property>
+ <name>hadoop.rm.queue.default.denied-users</name>
+ <value></value>
+ <description>Comma separated list of users who cannot submit
+ jobs to this queue. If empty, it means there are no ACLs
+ configured.
+ </description>
+ </property>
+
+ <property>
+ <name>hadoop.rm.queue.default.allowed-users-override</name>
+ <value>false</value>
+ <description>If true, specifies that users in the allowed
+ list will be able to submit, even if there names are in
+ the denied list.
+ </description>
+ </property>
+
+ <property>
+ <name>hadoop.rm.queue.default.supports-priority</name>
+ <value>true</value>
+ <description>If true, priorities of jobs will be taken into
+ account in scheduling decisions.
+ </description>
+ </property>
+
+ <property>
+ <name>hadoop.rm.queue.default.minimum-user-limit-percent</name>
+ <value>100</value>
+ <description>The minimum percentage of the cluster which can
+ be utilized by a single user.
+ </description>
+ </property>
+
+</configuration>
Modified: hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java?rev=674442&r1=674441&r2=674442&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java
(original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/conf/Configuration.java Mon
Jul 7 03:41:50 2008
@@ -72,8 +72,8 @@
* <code>Path</code>, then the local filesystem is examined directly, without
* referring to the classpath.
*
- * <p>Hadoop by default specifies two resources, loaded in-order from the
- * classpath: <ol>
+ * <p>Unless explicitly turned off, Hadoop by default specifies two
+ * resources, loaded in-order from the classpath: <ol>
* <li><tt><a href="[EMAIL
PROTECTED]/../hadoop-default.html">hadoop-default.xml</a>
* </tt>: Read-only defaults for hadoop.</li>
* <li><tt>hadoop-site.xml</tt>: Site-specific configuration for a given hadoop
@@ -153,13 +153,26 @@
/** A new configuration. */
public Configuration() {
+ this(true);
+ }
+
+ /** A new configuration where the behavior of reading from the default
+ * resources can be turned off.
+ *
+ * If the parameter [EMAIL PROTECTED] loadDefaults} is false, the new
instance
+ * will not load resources from the default files.
+ * @param loadDefaults specifies whether to load from the default files
+ */
+ public Configuration(boolean loadDefaults) {
if (LOG.isDebugEnabled()) {
LOG.debug(StringUtils.stringifyException(new IOException("config()")));
}
- resources.add("hadoop-default.xml");
- resources.add("hadoop-site.xml");
+ if (loadDefaults) {
+ resources.add("hadoop-default.xml");
+ resources.add("hadoop-site.xml");
+ }
}
-
+
/**
* A new configuration with the same settings cloned from another.
*
@@ -189,7 +202,7 @@
* with that name.
*/
public void addResource(String name) {
- addResource(resources, name);
+ addResourceObject(name);
}
/**
@@ -203,7 +216,7 @@
* the classpath.
*/
public void addResource(URL url) {
- addResource(resources, url);
+ addResourceObject(url);
}
/**
@@ -217,17 +230,27 @@
* the classpath.
*/
public void addResource(Path file) {
- addResource(resources, file);
+ addResourceObject(file);
}
- private synchronized void addResource(ArrayList<Object> resources,
- Object resource) {
-
- resources.add(resource); // add to resources
+ /**
+ * Reload configuration from previously added resources.
+ *
+ * This method will clear all the configuration read from the added
+ * resources, and final parameters. This will make the resources to
+ * be read again before accessing the values. Values that are added
+ * via set methods will overlay values read from the resources.
+ */
+ public synchronized void reloadConfiguration() {
properties = null; // trigger reload
finalParameters.clear(); // clear site-limits
}
+ private synchronized void addResourceObject(Object resource) {
+ resources.add(resource); // add to resources
+ reloadConfiguration();
+ }
+
private static Pattern varPat = Pattern.compile("\\$\\{[^\\}\\$\u0020]+\\}");
private static int MAX_SUBST = 20;
Added:
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceManagerConf.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceManagerConf.java?rev=674442&view=auto
==============================================================================
---
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceManagerConf.java
(added)
+++
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceManagerConf.java
Mon Jul 7 03:41:50 2008
@@ -0,0 +1,447 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Class providing access to resource manager configuration.
+ *
+ * Resource manager configuration involves setting up queues, and defining
+ * various properties for the queues. These are typically read from a file
+ * called resource-manager-conf.xml that must be in the classpath of the
+ * application. The class provides APIs to get/set and reload the
+ * configuration for the queues.
+ */
+class ResourceManagerConf {
+
+ /** Default file name from which the resource manager configuration is read.
*/
+ public static final String RM_CONF_FILE = "resource-manager-conf.xml";
+
+ /** Default value for guaranteed capacity of maps.
+ * The default value is set to <code>Integer.MAX_VALUE</code>, the idea
+ * being that the default is suitable for organizations that do not
+ * require setting up any queues.
+ */
+ public static final int DEFAULT_GUARANTEED_CAPACITY_MAPS =
+ Integer.MAX_VALUE;
+
+ /** Default value for guaranteed capacity of reduces.
+ * The default value is set to <code>Integer.MAX_VALUE</code>, the idea
+ * being that the default is suitable for organizations that do not
+ * require setting up any queues.
+ */
+ public static final int DEFAULT_GUARANTEED_CAPACITY_REDUCES =
+ Integer.MAX_VALUE;
+
+ /** Default value for reclaiming redistributed resources.
+ * The default value is set to <code>0</code>, the idea
+ * being that the default is suitable for organizations that do not
+ * require setting up any queues.
+ */
+ public static final int DEFAULT_RECLAIM_TIME_LIMIT = 0;
+
+ /** Default value for minimum resource limit per user per queue, as a
+ * percentage.
+ * The default value is set to <code>100</code>, the idea
+ * being that the default is suitable for organizations that do not
+ * require setting up any queues.
+ */
+ public static final int DEFAULT_MIN_USER_LIMIT_PERCENT = 100;
+
+ private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX
+ = "hadoop.rm.queue.";
+
+ private Configuration rmConf;
+ private Set<String> queues;
+
+ /**
+ * Create a new ResourceManagerConf.
+ * This method reads from the default configuration file mentioned in
+ * [EMAIL PROTECTED] RM_CONF_FILE}, that must be present in the classpath of
the
+ * application.
+ */
+ public ResourceManagerConf() {
+ rmConf = new Configuration(false);
+ rmConf.addResource(RM_CONF_FILE);
+ }
+
+ /**
+ * Create a new ResourceManagerConf reading the specified configuration
+ * file.
+ *
+ * @param configFile [EMAIL PROTECTED] Path} to the configuration file
containing
+ * the resource manager configuration.
+ */
+ public ResourceManagerConf(Path configFile) {
+ rmConf = new Configuration(false);
+ rmConf.addResource(configFile);
+ }
+
+ /**
+ * Return the set of configured queue names.
+ * @return set of configured queue names.
+ */
+ public synchronized Set<String> getQueues() {
+ if (queues == null) {
+ String[] vals = rmConf.getStrings("hadoop.rm.queue.names");
+ queues = new TreeSet<String>();
+ for (String val : vals) {
+ queues.add(val);
+ }
+ }
+ return queues;
+ }
+
+ /**
+ * Define the set of queues known to this configuration.
+ * This will override the queue names that are read from the
+ * configuration file.
+ * @param newQueues Set of queue names
+ */
+ public synchronized void setQueues(Set<String> newQueues) {
+ StringBuffer queueSb = new StringBuffer();
+ for (String queue : newQueues) {
+ queueSb.append(queue).append(',');
+ }
+ if (newQueues.size() > 0) {
+ String value = queueSb.substring(0, queueSb.length());
+ rmConf.set("hadoop.rm.queue.names", value);
+ queues = null;
+ }
+ }
+
+ /**
+ * Get the guaranteed number of maps for the specified queue.
+ *
+ * This method defaults to [EMAIL PROTECTED]
#DEFAULT_GUARANTEED_CAPACITY_MAPS} if
+ * no value is specified in the configuration for this queue. If the queue
+ * name is unknown, this method throws a [EMAIL PROTECTED]
IllegalArgumentException}
+ * @param queue name of the queue
+ * @return guaranteed number of maps for this queue.
+ */
+ public int getGuaranteedCapacityMaps(String queue) {
+ checkQueue(queue);
+ return rmConf.getInt(toFullPropertyName(queue,
+ "guaranteed-capacity-maps"),
+ DEFAULT_GUARANTEED_CAPACITY_MAPS);
+ }
+
+ /**
+ * Set the guaranteed number of maps for the specified queue
+ * @param queue Name of the queue
+ * @param value Guaranteed number of maps.
+ */
+ public void setGuaranteedCapacityMaps(String queue, int value) {
+ checkQueue(queue);
+ rmConf.setInt(toFullPropertyName(queue,"guaranteed-capacity-maps"),
+ value);
+ }
+
+ /**
+ * Get the guaranteed number of reduces for the specified queue.
+ *
+ * This method defaults to [EMAIL PROTECTED]
#DEFAULT_GUARANTEED_CAPACITY_REDUCES} if
+ * no value is specified in the configuration for this queue. If the queue
+ * name is unknown, this method throws a [EMAIL PROTECTED]
IllegalArgumentException}
+ * @param queue name of the queue
+ * @return guaranteed number of reduces for this queue.
+ */
+ public int getGuaranteedCapacityReduces(String queue) {
+ checkQueue(queue);
+ return rmConf.getInt(toFullPropertyName(queue,
+ "guaranteed-capacity-reduces"),
+ DEFAULT_GUARANTEED_CAPACITY_REDUCES);
+ }
+
+ /**
+ * Set the guaranteed number of reduces for the specified queue
+ * @param queue Name of the queue
+ * @param value Guaranteed number of reduces.
+ */
+ public void setGuaranteedCapacityReduces(String queue, int value) {
+ checkQueue(queue);
+ rmConf.setInt(toFullPropertyName(queue,"guaranteed-capacity-reduces"),
+ value);
+ }
+
+ /**
+ * Get the amount of time before which redistributed resources must be
+ * reclaimed for the specified queue.
+ *
+ * The resource manager distributes spare capacity from a free queue
+ * to ones which are in need for more resources. However, if a job
+ * submitted to the first queue requires back the resources, they must
+ * be reclaimed within the specified configuration time limit.
+ *
+ * This method defaults to [EMAIL PROTECTED] #DEFAULT_RECLAIM_TIME_LIMIT} if
+ * no value is specified in the configuration for this queue. If the queue
+ * name is unknown, this method throws a [EMAIL PROTECTED]
IllegalArgumentException}
+ * @param queue name of the queue
+ * @return reclaim time limit for this queue.
+ */
+ public int getReclaimTimeLimit(String queue) {
+ checkQueue(queue);
+ return rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"),
+ DEFAULT_RECLAIM_TIME_LIMIT);
+ }
+
+ /**
+ * Set the amount of time before which redistributed resources must be
+ * reclaimed for the specified queue.
+ * @param queue Name of the queue
+ * @param value Amount of time before which the redistributed resources
+ * must be retained.
+ */
+ public void setReclaimTimeLimit(String queue, int value) {
+ checkQueue(queue);
+ rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value);
+ }
+
+ /**
+ * Get the list of users who can submit jobs to this queue.
+ *
+ * This method defaults to <code>null</code> if no value is specified
+ * in the configuration for this queue. If the queue
+ * name is unknown, this method throws a [EMAIL PROTECTED]
IllegalArgumentException}
+ * @param queue name of the queue
+ * @return list of users who can submit jobs to this queue.
+ */
+ public String[] getAllowedUsers(String queue) {
+ checkQueue(queue);
+ return rmConf.getStrings(toFullPropertyName(queue, "allowed-users"),
+ (String[])null);
+ }
+
+ /**
+ * Set the list of users who can submit jobs to this queue.
+ *
+ * If the queue name is unknown, this method throws a
+ * [EMAIL PROTECTED] IllegalArgumentException}
+ * @param queue name of the queue
+ * @param values list of users allowed to submit jobs to this queue.
+ */
+ public void setAllowedUsers(String queue, String... values) {
+ checkQueue(queue);
+ rmConf.setStrings(toFullPropertyName(queue, "allowed-users"), values);
+ }
+
+ /**
+ * Get the list of users who cannot submit jobs to this queue.
+ *
+ * This method defaults to <code>null</code> if no value is specified
+ * in the configuration for this queue. If the queue
+ * name is unknown, this method throws a [EMAIL PROTECTED]
IllegalArgumentException}
+ * @param queue name of the queue
+ * @return list of users who cannot submit jobs to this queue.
+ */
+ public String[] getDeniedUsers(String queue) {
+ checkQueue(queue);
+ return rmConf.getStrings(toFullPropertyName(queue, "denied-users"),
+ (String[])null);
+ }
+
+ /**
+ * Set the list of users who cannot submit jobs to this queue.
+ *
+ * If the queue name is unknown, this method throws a
+ * [EMAIL PROTECTED] IllegalArgumentException}
+ * @param queue name of the queue
+ * @param values list of users denied to submit jobs to this queue.
+ */
+ public void setDeniedUsers(String queue, String... values) {
+ checkQueue(queue);
+ rmConf.setStrings(toFullPropertyName(queue, "denied-users"), values);
+ }
+
+ /**
+ * Get whether users present in allowed users list will override values
+ * in the denied user list for this queue.
+ *
+ * If a user name is specified in both the allowed user list and the denied
+ * user list, this configuration determines whether to honor the allowed
+ * user list or the denied user list. This method defaults to
+ * <code>false</code> if no value is specified in the configuration for
+ * this queue. If the queue name is unknown, this method throws a
+ * [EMAIL PROTECTED] IllegalArgumentException}
+ * @param queue name of the queue
+ * @return
+ */
+ public boolean doAllowedUsersOverride(String queue) {
+ checkQueue(queue);
+ return rmConf.getBoolean(toFullPropertyName(queue,
+ "allowed-users-override"), false);
+ }
+
+ /**
+ * Set whether users present in allowed users list will override values
+ * in the denied user list for this queue.
+ *
+ * If the queue name is unknown, this method throws a
+ * [EMAIL PROTECTED] IllegalArgumentException}
+ * @param queue name of the queue
+ * @param value true if the allowed users take priority, false otherwise.
+ */
+ public void setAllowedUsersOverride(String queue, boolean value) {
+ checkQueue(queue);
+ rmConf.setBoolean(toFullPropertyName(queue, "allowed-users-override"),
+ value);
+ }
+
+ /**
+ * Get whether priority is supported for this queue.
+ *
+ * If this value is false, then job priorities will be ignored in
+ * scheduling decisions. This method defaults to <code>false</code> if
+ * the property is not configured for this queue. If the queue name is
+ * unknown, this method throws a [EMAIL PROTECTED] IllegalArgumentException}
+ * @param queue name of the queue
+ * @return Whether this queue supports priority or not.
+ */
+ public boolean isPrioritySupported(String queue) {
+ checkQueue(queue);
+ return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"),
+ false);
+ }
+
+ /**
+ * Set whether priority is supported for this queue.
+ *
+ * If the queue name is unknown, this method throws a
+ * [EMAIL PROTECTED] IllegalArgumentException}
+ * @param queue name of the queue
+ * @param value true, if the queue must support priorities, false otherwise.
+ */
+ public void setPrioritySupported(String queue, boolean value) {
+ checkQueue(queue);
+ rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value);
+ }
+
+ /**
+ * Get the minimum limit of resources for any user submitting jobs in
+ * this queue, in percentage.
+ *
+ * This method defaults to [EMAIL PROTECTED]
#DEFAULT_MIN_USER_LIMIT_PERCENT} if
+ * no value is specified in the configuration for this queue. If the queue
+ * name is unknown, this method throws a [EMAIL PROTECTED]
IllegalArgumentException}
+ * @param queue name of the queue
+ * @return minimum limit of resources, in percentage, that will be
+ * available for a user.
+ *
+ */
+ public int getMinimumUserLimitPercent(String queue) {
+ checkQueue(queue);
+ return rmConf.getInt(toFullPropertyName(queue,
+ "minimum-user-limit-percent"),
+ DEFAULT_MIN_USER_LIMIT_PERCENT);
+ }
+
+ /**
+ * Set the minimum limit of resources for any user submitting jobs in
+ * this queue, in percentage.
+ *
+ * If the queue name is unknown, this method throws a
+ * [EMAIL PROTECTED] IllegalArgumentException}
+ * @param queue name of the queue
+ * @param value minimum limit of resources for any user submitting jobs
+ * in this queue
+ */
+ public void setMinimumUserLimitPercent(String queue, int value) {
+ checkQueue(queue);
+ rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"),
+ value);
+ }
+
+ /**
+ * Reload configuration by clearing the information read from the
+ * underlying configuration file.
+ */
+ public synchronized void reloadConfiguration() {
+ queues = null;
+ rmConf.reloadConfiguration();
+ }
+
+ /**
+ * Print configuration read from the underlying configuration file.
+ *
+ * @param writer [EMAIL PROTECTED] PrintWriter} to which the output must be
written.
+ */
+ public void printConfiguration(PrintWriter writer) {
+
+ Set<String> queueSet = getQueues();
+ if (queueSet == null) {
+ writer.println("No queues configured.");
+ return;
+ }
+ StringBuffer sb = new StringBuffer();
+ for (String queue : queueSet) {
+ sb.append(queue).append(",");
+ }
+ writer.println("hadoop.rm.queue.names: " + sb.substring(0, sb.length()-1));
+
+ for (String queue : queueSet) {
+ writer.println(toFullPropertyName(queue, "guaranteed-capacity-maps") +
+ ": " + getGuaranteedCapacityMaps(queue));
+ writer.println(toFullPropertyName(queue, "guaranteed-capacity-reduces")
+
+ ": " + getGuaranteedCapacityReduces(queue));
+ writer.println(toFullPropertyName(queue, "reclaim-time-limit") +
+ ": " + getReclaimTimeLimit(queue));
+ writer.println(toFullPropertyName(queue, "minimum-user-limit-percent") +
+ ": " + getMinimumUserLimitPercent(queue));
+ writer.println(toFullPropertyName(queue, "supports-priority") +
+ ": " + isPrioritySupported(queue));
+ writer.println(toFullPropertyName(queue, "allowed-users-override") +
+ ": " + doAllowedUsersOverride(queue));
+ printUserList(writer, queue, "allowed-users", getAllowedUsers(queue));
+ printUserList(writer, queue, "denied-users", getDeniedUsers(queue));
+ }
+ }
+
+ private void printUserList(PrintWriter writer, String queue,
+ String listType, String[] users) {
+ if (users == null) {
+ writer.println(toFullPropertyName(queue, listType) + ": No users
configured.");
+ } else {
+ writer.println(toFullPropertyName(queue, listType) + ": ");
+ Arrays.sort(users);
+ for (String user : users) {
+ writer.println("\t" + user);
+ }
+ }
+ }
+
+ private synchronized void checkQueue(String queue) {
+ if (queues == null) {
+ queues = getQueues();
+ }
+ if (!queues.contains(queue)) {
+ throw new IllegalArgumentException("Queue " + queue + " is undefined.");
+ }
+ }
+
+ private static final String toFullPropertyName(String queue,
+ String property) {
+ return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
+ }
+}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java?rev=674442&r1=674441&r2=674442&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/conf/TestConfiguration.java
Mon Jul 7 03:41:50 2008
@@ -292,7 +292,48 @@
assertEquals(-20, conf.getInt("test.int3", 0));
assertEquals(-20, conf.getLong("test.int3", 0));
}
-
+
+ public void testReload() throws IOException {
+ out=new BufferedWriter(new FileWriter(CONFIG));
+ startConfig();
+ appendProperty("test.key1", "final-value1", true);
+ appendProperty("test.key2", "value2");
+ endConfig();
+ Path fileResource = new Path(CONFIG);
+ conf.addResource(fileResource);
+
+ out=new BufferedWriter(new FileWriter(CONFIG2));
+ startConfig();
+ appendProperty("test.key1", "value1");
+ appendProperty("test.key3", "value3");
+ endConfig();
+ Path fileResource1 = new Path(CONFIG2);
+ conf.addResource(fileResource1);
+
+ // add a few values via set.
+ conf.set("test.key3", "value4");
+ conf.set("test.key4", "value5");
+
+ assertEquals("final-value1", conf.get("test.key1"));
+ assertEquals("value2", conf.get("test.key2"));
+ assertEquals("value4", conf.get("test.key3"));
+ assertEquals("value5", conf.get("test.key4"));
+
+ // change values in the test file...
+ out=new BufferedWriter(new FileWriter(CONFIG));
+ startConfig();
+ appendProperty("test.key1", "final-value1");
+ appendProperty("test.key3", "final-value3", true);
+ endConfig();
+
+ conf.reloadConfiguration();
+ assertEquals("value1", conf.get("test.key1"));
+ // overlayed property overrides.
+ assertEquals("value4", conf.get("test.key3"));
+ assertEquals(null, conf.get("test.key2"));
+ assertEquals("value5", conf.get("test.key4"));
+ }
+
public static void main(String[] argv) throws Exception {
junit.textui.TestRunner.main(new String[]{
TestConfiguration.class.getName()
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceManagerConf.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceManagerConf.java?rev=674442&view=auto
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceManagerConf.java
(added)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceManagerConf.java
Mon Jul 7 03:41:50 2008
@@ -0,0 +1,447 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PipedReader;
+import java.io.PipedWriter;
+import java.io.PrintWriter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+
+public class TestResourceManagerConf extends TestCase {
+
+ private static final String TEST_CONF_FILE =
+ new File(".", "test-conf.xml").getAbsolutePath();
+ private PrintWriter writer;
+ private Map<String, String> defaultProperties;
+ private ResourceManagerConf testConf;
+
+ public TestResourceManagerConf() {
+ defaultProperties = setupQueueProperties(
+ new String[] { "guaranteed-capacity-maps",
+ "guaranteed-capacity-reduces",
+ "reclaim-time-limit",
+ "allowed-users",
+ "denied-users",
+ "allowed-users-override",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { String.valueOf(Integer.MAX_VALUE),
+ String.valueOf(Integer.MAX_VALUE),
+ "0", null, null, "false",
+ "true", "100" }
+ );
+ }
+
+ public void setUp() throws IOException {
+ openFile();
+ }
+
+ public void tearDown() throws IOException {
+ File testConfFile = new File(TEST_CONF_FILE);
+ if (testConfFile.exists()) {
+ testConfFile.delete();
+ }
+ }
+
+ public void testDefaults() {
+ testConf = new ResourceManagerConf();
+ Map<String, Map<String, String>> queueDetails
+ = new HashMap<String, Map<String,String>>();
+ queueDetails.put("default", defaultProperties);
+ checkQueueProperties(testConf, queueDetails);
+ }
+
+ public void testQueues() {
+
+ Map<String, String> q1Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity-maps",
+ "guaranteed-capacity-reduces",
+ "reclaim-time-limit",
+ "allowed-users",
+ "denied-users",
+ "allowed-users-override",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "10", "5", "600", "u1,u2,u3", "u1,g1", "false",
+ "true", "25" }
+ );
+
+ Map<String, String> q2Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity-maps",
+ "guaranteed-capacity-reduces",
+ "reclaim-time-limit",
+ "allowed-users",
+ "denied-users",
+ "allowed-users-override",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "100", "50", "6000", "u4,u5,u6", "u6,g2", "true",
+ "false", "50" }
+ );
+
+ startConfig();
+ writeQueueDetails("default", q1Props);
+ writeQueueDetails("research", q2Props);
+ writeQueues("default,research");
+ endConfig();
+
+ testConf = new ResourceManagerConf(new Path(TEST_CONF_FILE));
+
+ Map<String, Map<String, String>> queueDetails
+ = new HashMap<String, Map<String,String>>();
+ queueDetails.put("default", q1Props);
+ queueDetails.put("research", q2Props);
+ checkQueueProperties(testConf, queueDetails);
+ }
+
+ public void testQueueWithDefaultProperties() {
+ Map<String, String> q1Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity-maps",
+ "guaranteed-capacity-reduces",
+ "allowed-users",
+ "allowed-users-override",
+ "minimum-user-limit-percent" },
+ new String[] { "20", "10", "u7,u8,u9", "false", "75" }
+ );
+ startConfig();
+ writeQueueDetails("default", q1Props);
+ writeQueues("default");
+ endConfig();
+
+ testConf = new ResourceManagerConf(new Path(TEST_CONF_FILE));
+
+ Map<String, Map<String, String>> queueDetails
+ = new HashMap<String, Map<String,String>>();
+ Map<String, String> expProperties = new HashMap<String, String>();
+ for (String key : q1Props.keySet()) {
+ expProperties.put(key, q1Props.get(key));
+ }
+ expProperties.put("reclaim-time-limit", "0");
+ expProperties.put("denied-users", null);
+ expProperties.put("supports-priority", "false");
+ queueDetails.put("default", expProperties);
+ checkQueueProperties(testConf, queueDetails);
+ }
+
+ public void testInvalidQueue() {
+ try {
+ testConf = new ResourceManagerConf();
+ testConf.getGuaranteedCapacityMaps("invalid");
+ fail("Should not return value for invalid queue");
+ } catch (IllegalArgumentException iae) {
+ assertEquals("Queue invalid is undefined.", iae.getMessage());
+ }
+ }
+
+ public void testQueueAddition() {
+ testConf = new ResourceManagerConf();
+ Set<String> queues = new HashSet<String>();
+ queues.add("default");
+ queues.add("newqueue");
+ testConf.setQueues(queues);
+
+ testConf.setGuaranteedCapacityMaps("newqueue", 1);
+ testConf.setGuaranteedCapacityReduces("newqueue", 1);
+ testConf.setReclaimTimeLimit("newqueue", 30);
+ testConf.setMinimumUserLimitPercent("newqueue", 40);
+ testConf.setAllowedUsers("newqueue", "a", "b", "c");
+ testConf.setDeniedUsers("newqueue", "d", "e", "f");
+ testConf.setAllowedUsersOverride("newqueue", true);
+ testConf.setPrioritySupported("newqueue", false);
+
+ Map<String, String> q1Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity-maps",
+ "guaranteed-capacity-reduces",
+ "reclaim-time-limit",
+ "allowed-users",
+ "denied-users",
+ "allowed-users-override",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "1", "1", "30", "a,b,c", "d,e,f", "true",
+ "false", "40" }
+ );
+
+ Map<String, Map<String, String>> queueDetails =
+ new HashMap<String, Map<String, String>>();
+ queueDetails.put("default", defaultProperties);
+ queueDetails.put("newqueue", q1Props);
+ checkQueueProperties(testConf, queueDetails);
+ }
+
+ public void testReload() throws IOException {
+ // use the setup in the test case testQueues as a base...
+ testQueues();
+
+ // write new values to the file...
+ Map<String, String> q1Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity-maps",
+ "guaranteed-capacity-reduces",
+ "reclaim-time-limit",
+ "allowed-users",
+ "denied-users",
+ "allowed-users-override",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "20", "5", "600", "u1,u2,u3,u4", "u1,g1", "false",
+ "true", "40" }
+ );
+
+ Map<String, String> q2Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity-maps",
+ "guaranteed-capacity-reduces",
+ "reclaim-time-limit",
+ "allowed-users",
+ "denied-users",
+ "allowed-users-override",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "1000", "500", "3000", "u4,u6", "g2", "false",
+ "false", "50" }
+ );
+
+ openFile();
+ startConfig();
+ writeQueueDetails("default", q1Props);
+ writeQueueDetails("production", q2Props);
+ writeQueues("default,production");
+ endConfig();
+
+ testConf.reloadConfiguration();
+
+ Map<String, Map<String, String>> queueDetails
+ = new HashMap<String, Map<String, String>>();
+ queueDetails.put("default", q1Props);
+ queueDetails.put("production", q2Props);
+ checkQueueProperties(testConf, queueDetails);
+ }
+
+ public void testPrint() throws IOException, InterruptedException {
+
+ Map<String, String> q1Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity-maps",
+ "guaranteed-capacity-reduces",
+ "reclaim-time-limit",
+ "allowed-users",
+ "denied-users",
+ "allowed-users-override",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "20", "5", "600", "u2,a1,c3,b4", "u1,g1", "false",
+ "true", "40" }
+ );
+
+ Map<String, String> q2Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity-maps",
+ "guaranteed-capacity-reduces",
+ "reclaim-time-limit",
+ "allowed-users",
+ "denied-users",
+ "allowed-users-override",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "1000", "500", "3000", "e5,d6", "g2", "false",
+ "false", "50" }
+ );
+
+ startConfig();
+ writeQueueDetails("default", q1Props);
+ writeQueueDetails("research", q2Props);
+ writeQueues("default,research");
+ endConfig();
+
+ testConf = new ResourceManagerConf(new Path(TEST_CONF_FILE));
+
+ PipedWriter pw = new PipedWriter();
+ PrintWriter writer = new PrintWriter(pw);
+ PipedReader pr = new PipedReader(pw);
+ BufferedReader br = new BufferedReader(pr);
+
+ PrintReaderThread prThread = new PrintReaderThread(br);
+ prThread.start();
+
+ testConf.printConfiguration(writer);
+ writer.close();
+
+ prThread.join();
+ ArrayList<String> output = prThread.getOutput();
+
+ ListIterator iter = output.listIterator();
+
+ assertEquals("hadoop.rm.queue.names: default,research", iter.next());
+ checkPrintOutput("default", q1Props, iter,
+ new String[] {"a1", "b4", "c3", "u2"},
+ new String[] {"g1", "u1"});
+ checkPrintOutput("research", q2Props, iter,
+ new String[] {"d6", "e5" },
+ new String[] {"g2"});
+ }
+
+ private void checkPrintOutput(String queue,
+ Map<String, String> queueProps,
+ ListIterator iter,
+ String[] allowedUsers,
+ String[] deniedUsers) {
+
+ String[] propsOrder = {"guaranteed-capacity-maps",
+ "guaranteed-capacity-reduces",
+ "reclaim-time-limit",
+ "minimum-user-limit-percent",
+ "supports-priority",
+ "allowed-users-override"};
+
+ for (String prop : propsOrder) {
+ assertEquals(("hadoop.rm.queue." + queue + "." + prop + ": "
+ + queueProps.get(prop)), iter.next());
+ }
+
+ checkUserList(queue, iter, allowedUsers, "allowed-users");
+ checkUserList(queue, iter, deniedUsers, "denied-users");
+ }
+
+ private void checkUserList(String queue,
+ ListIterator iter,
+ String[] userList,
+ String listType) {
+ assertEquals("hadoop.rm.queue." + queue + "." + listType + ": ",
+ iter.next());
+ for (String user : userList) {
+ assertEquals("\t" + user, iter.next());
+ }
+ }
+
+ private void checkQueueProperties(
+ ResourceManagerConf testConf,
+ Map<String, Map<String, String>> queueDetails) {
+ Set<String> queues = testConf.getQueues();
+ assertEquals(queueDetails.keySet().size(), queues.size());
+ for (String name : queueDetails.keySet()) {
+ assertTrue(queues.contains(name));
+ }
+
+ for (String queueName : queueDetails.keySet()) {
+ Map<String, String> map = queueDetails.get(queueName);
+ assertEquals(Integer.parseInt(map.get("guaranteed-capacity-maps")),
+ testConf.getGuaranteedCapacityMaps(queueName));
+ assertEquals(Integer.parseInt(map.get("guaranteed-capacity-reduces")),
+ testConf.getGuaranteedCapacityReduces(queueName));
+ assertEquals(Integer.parseInt(map.get("minimum-user-limit-percent")),
+ testConf.getMinimumUserLimitPercent(queueName));
+ assertEquals(Integer.parseInt(map.get("reclaim-time-limit")),
+ testConf.getReclaimTimeLimit(queueName));
+ assertEquals(Boolean.parseBoolean(map.get("supports-priority")),
+ testConf.isPrioritySupported(queueName));
+ String[] expAllowedUsers = StringUtils.getStrings(
+ map.get("allowed-users"));
+ String[] allowedUsers = testConf.getAllowedUsers(queueName);
+ assertTrue(Arrays.equals(expAllowedUsers, allowedUsers));
+ String[] expDeniedUsers = StringUtils.getStrings(
+ map.get("denied-users"));
+ String[] deniedUsers = testConf.getDeniedUsers(queueName);
+ assertTrue(Arrays.equals(expDeniedUsers, deniedUsers));
+ assertEquals(Boolean.parseBoolean(map.get("allowed-users-override")),
+ testConf.doAllowedUsersOverride(queueName));
+ }
+ }
+
+ private Map<String, String> setupQueueProperties(String[] keys,
+ String[] values) {
+ HashMap<String, String> map = new HashMap<String, String>();
+ for(int i=0; i<keys.length; i++) {
+ map.put(keys[i], values[i]);
+ }
+ return map;
+ }
+
+ private void openFile() throws IOException {
+ FileWriter fw = new FileWriter(TEST_CONF_FILE);
+ BufferedWriter bw = new BufferedWriter(fw);
+ writer = new PrintWriter(bw);
+ }
+
+ private void startConfig() {
+ writer.println("<?xml version=\"1.0\"?>");
+ writer.println("<configuration>");
+ }
+
+ private void writeQueues(String queueList) {
+ writer.println("<property>");
+ writer.println("<name>hadoop.rm.queue.names</name>");
+ writer.println("<value>"+queueList+"</value>");
+ writer.println("</property>");
+ }
+
+ private void writeQueueDetails(String queue, Map<String, String> props) {
+ for (String key : props.keySet()) {
+ writer.println("<property>");
+ writer.println("<name>hadoop.rm.queue." + queue + "." + key +
+ "</name>");
+ writer.println("<value>"+props.get(key)+"</value>");
+ writer.println("</property>");
+ }
+ }
+
+ private void endConfig() {
+ writer.println("</configuration>");
+ writer.close();
+ }
+
+ class PrintReaderThread extends Thread {
+
+ private BufferedReader reader;
+ private ArrayList<String> lines;
+
+ public PrintReaderThread(BufferedReader reader) {
+ this.reader = reader;
+ lines = new ArrayList<String>();
+ }
+
+ public void run() {
+ try {
+ String line = reader.readLine();
+ while (line != null) {
+ lines.add(line);
+ line = reader.readLine();
+ }
+ } catch (IOException ioe) {
+ lines.clear();
+ }
+ }
+
+ public ArrayList<String> getOutput() {
+ return lines;
+ }
+ }
+}