Repository: storm
Updated Branches:
  refs/heads/master 08d9d5a87 -> e1dd247ce


[STORM-2201] Add dynamic scheduler configuration loading


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f1f95449
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f1f95449
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f1f95449

Branch: refs/heads/master
Commit: f1f954493408e5c62d4a0e061ae6180c8ad7972a
Parents: d06bb38
Author: Ethan Li <[email protected]>
Authored: Mon Jul 10 12:04:50 2017 -0500
Committer: Ethan Li <[email protected]>
Committed: Tue Aug 29 12:54:00 2017 -0500

----------------------------------------------------------------------
 docs/IConfigLoader.md                           |  51 +++
 pom.xml                                         |   7 +
 storm-server/pom.xml                            |   5 +
 .../java/org/apache/storm/DaemonConfig.java     |  94 +++--
 .../multitenant/MultitenantScheduler.java       |  62 ++-
 .../resource/ResourceAwareScheduler.java        |  89 +++--
 .../utils/ArtifactoryConfigLoader.java          | 395 +++++++++++++++++++
 .../utils/ArtifactoryConfigLoaderFactory.java   |  50 +++
 .../utils/ConfigLoaderFactoryService.java       |  68 ++++
 .../storm/scheduler/utils/FileConfigLoader.java |  72 ++++
 .../utils/FileConfigLoaderFactory.java          |  51 +++
 .../storm/scheduler/utils/IConfigLoader.java    |  32 ++
 .../scheduler/utils/IConfigLoaderFactory.java   |  35 ++
 .../utils/ArtifactoryConfigLoaderTest.java      | 235 +++++++++++
 .../scheduler/utils/FileConfigLoaderTest.java   | 115 ++++++
 15 files changed, 1275 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/docs/IConfigLoader.md
----------------------------------------------------------------------
diff --git a/docs/IConfigLoader.md b/docs/IConfigLoader.md
new file mode 100644
index 0000000..3c2f4de
--- /dev/null
+++ b/docs/IConfigLoader.md
@@ -0,0 +1,51 @@
+---
+title: IConfigLoader
+layout: documentation
+documentation: true
+---
+
+
+### Introduction
+IConfigLoader is an interface designed to allow dynamic loading of scheduler 
resource constraints. Currently, the MultiTenant scheduler uses this interface 
to dynamically load the number of isolated nodes a given user has been 
guaranteed, and the ResoureAwareScheduler uses the interface to dynamically 
load per user resource guarantees.
+
+The following interface is provided for users to create an IConfigLoader 
instance based on the scheme of the `scheduler.config.loader.uri`.
+```
+ConfigLoaderFactoryService.createConfigLoader(Map<String, Object> conf)
+``` 
+
+------
+
+### Interface
+```
+public interface IConfigLoader {
+    Map<?,?> load();
+};
+```
+#### Description
+  - load is called by the scheduler whenever it wishes to retrieve the most 
recent configuration map.
+ 
+#### Loader Configuration
+The loaders are dynamically selected and dynamically configured through 
configuration items in the scheduler implementations.
+
+##### Example
+```
+scheduler.config.loader.uri: 
"artifactory+http://artifactory.my.company.com:8000/artifactory/configurations/clusters/my_cluster/ras_pools";
+scheduler.config.loader.timeout.sec: 30
+```
+Or
+```
+scheduler.config.loader.uri: "file:///path/to/my/config.yaml"
+```
+### Implementations
+
+There are currently two implemenations of IConfigLoader
+ - org.apache.storm.scheduler.utils.ArtifactoryConfigLoader: Load 
configurations from an Artifactory server. 
+ It will be used if users add `artifactory+` to the scheme of the real URI and 
set to `scheduler.config.loader.uri`.
+ - org.apache.storm.scheduler.utils.FileConfigLoader: Load configurations from 
a local file. It will be used if users use `file` scheme.
+
+#### Configurations
+ - scheduler.config.loader.uri: For `ArtifactoryConfigLoader`, this can either 
be a reference to an individual file in Artifactory or to a directory.  If it 
is a directory, the file with the largest lexographic name will be returned.
+ For `FileConfigLoader`, this is the URI pointing to a file.
+ - scheduler.config.loader.timeout.secs: Currently only used in 
`ArtifactoryConfigLoader`. It is the amount of time an http connection to the 
artifactory server will wait before timing out. The default is 10.
+ - scheduler.config.loader.polltime.secs: Currently only used in 
`ArtifactoryConfigLoader`. It is the frequency at which the plugin will call 
out to artifactory instead of returning the most recently cached result. The 
default is 600 seconds.
+ - scheduler.config.loader.artifactory.base.directory: Only used in 
`ArtifactoryConfigLoader`. It is the part of the uri, configurable in 
Artifactory, which represents the top of the directory tree. It defaults to 
"/artifactory".
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c3a04b9..9ea2501 100644
--- a/pom.xml
+++ b/pom.xml
@@ -261,6 +261,7 @@
         <disruptor.version>3.3.2</disruptor.version>
         <jgrapht.version>0.9.0</jgrapht.version>
         <guava.version>16.0.1</guava.version>
+        <auto-service.version>1.0-rc3</auto-service.version>
         <netty.version>3.9.0.Final</netty.version>
         <sysout-over-slf4j.version>1.0.2</sysout-over-slf4j.version>
         <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
@@ -899,6 +900,12 @@
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
             </dependency>
+                <dependency>
+                    <groupId>com.google.auto.service</groupId>
+                    <artifactId>auto-service</artifactId>
+                    <version>${auto-service.version}</version>
+                    <optional>true</optional>
+                </dependency>
             <dependency>
                 <groupId>org.apache.logging.log4j</groupId>
                 <artifactId>log4j-api</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/pom.xml
----------------------------------------------------------------------
diff --git a/storm-server/pom.xml b/storm-server/pom.xml
index 311b58f..b2a2c0e 100644
--- a/storm-server/pom.xml
+++ b/storm-server/pom.xml
@@ -59,6 +59,11 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-compress</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.google.auto.service</groupId>
+            <artifactId>auto-service</artifactId>
+            <optional>true</optional>
+        </dependency>
 
         <!-- test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
----------------------------------------------------------------------
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java 
b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index a1292e0..75a9cad 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -221,7 +221,7 @@ public class DaemonConfig implements Validated {
     public static final String NIMBUS_TOPOLOGY_VALIDATOR = 
"nimbus.topology.validator";
 
     /**
-     * Class name for authorization plugin for Nimbus
+     * Class name for authorization plugin for Nimbus.
      */
     @isString
     public static final String NIMBUS_AUTHORIZER = "nimbus.authorizer";
@@ -271,7 +271,7 @@ public class DaemonConfig implements Validated {
     public static final String UI_CENTRAL_LOGGING_URL = 
"ui.central.logging.url";
 
     /**
-     * HTTP UI port for log viewer
+     * HTTP UI port for log viewer.
      */
     @isInteger
     @isPositiveNumber
@@ -284,46 +284,46 @@ public class DaemonConfig implements Validated {
     public static final String LOGVIEWER_CHILDOPTS = "logviewer.childopts";
 
     /**
-     * How often to clean up old log files
+     * How often to clean up old log files.
      */
     @isInteger
     @isPositiveNumber
     public static final String LOGVIEWER_CLEANUP_INTERVAL_SECS = 
"logviewer.cleanup.interval.secs";
 
     /**
-     * How many minutes since a log was last modified for the log to be 
considered for clean-up
+     * How many minutes since a log was last modified for the log to be 
considered for clean-up.
      */
     @isInteger
     @isPositiveNumber
     public static final String LOGVIEWER_CLEANUP_AGE_MINS = 
"logviewer.cleanup.age.mins";
 
     /**
-     * The maximum number of bytes all worker log files can take up in MB
+     * The maximum number of bytes all worker log files can take up in MB.
      */
     @isPositiveNumber
     public static final String LOGVIEWER_MAX_SUM_WORKER_LOGS_SIZE_MB = 
"logviewer.max.sum.worker.logs.size.mb";
 
     /**
-     * The maximum number of bytes per worker's files can take up in MB
+     * The maximum number of bytes per worker's files can take up in MB.
      */
     @isPositiveNumber
     public static final String LOGVIEWER_MAX_PER_WORKER_LOGS_SIZE_MB = 
"logviewer.max.per.worker.logs.size.mb";
 
     /**
-     * Storm Logviewer HTTPS port
+     * Storm Logviewer HTTPS port.
      */
     @isInteger
     @isPositiveNumber
     public static final String LOGVIEWER_HTTPS_PORT = "logviewer.https.port";
 
     /**
-     * Path to the keystore containing the certs used by Storm Logviewer for 
HTTPS communications
+     * Path to the keystore containing the certs used by Storm Logviewer for 
HTTPS communications.
      */
     @isString
     public static final String LOGVIEWER_HTTPS_KEYSTORE_PATH = 
"logviewer.https.keystore.path";
 
     /**
-     * Password for the keystore for HTTPS for Storm Logviewer
+     * Password for the keystore for HTTPS for Storm Logviewer.
      */
     @isString
     public static final String LOGVIEWER_HTTPS_KEYSTORE_PASSWORD = 
"logviewer.https.keystore.password";
@@ -342,13 +342,13 @@ public class DaemonConfig implements Validated {
     public static final String LOGVIEWER_HTTPS_KEY_PASSWORD = 
"logviewer.https.key.password";
 
     /**
-     * Path to the truststore containing the certs used by Storm Logviewer for 
HTTPS communications
+     * Path to the truststore containing the certs used by Storm Logviewer for 
HTTPS communications.
      */
     @isString
     public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PATH = 
"logviewer.https.truststore.path";
 
     /**
-     * Password for the truststore for HTTPS for Storm Logviewer
+     * Password for the truststore for HTTPS for Storm Logviewer.
      */
     @isString
     public static final String LOGVIEWER_HTTPS_TRUSTSTORE_PASSWORD = 
"logviewer.https.truststore.password";
@@ -370,13 +370,13 @@ public class DaemonConfig implements Validated {
     public static final String LOGVIEWER_HTTPS_NEED_CLIENT_AUTH = 
"logviewer.https.need.client.auth";
 
     /**
-     * A list of users allowed to view logs via the Log Viewer
+     * A list of users allowed to view logs via the Log Viewer.
      */
     @isStringList
     public static final String LOGS_USERS = "logs.users";
 
     /**
-     * A list of groups allowed to view logs via the Log Viewer
+     * A list of groups allowed to view logs via the Log Viewer.
      */
     @isStringList
     public static final String LOGS_GROUPS = "logs.groups";
@@ -394,19 +394,19 @@ public class DaemonConfig implements Validated {
     public static final String UI_CHILDOPTS = "ui.childopts";
 
     /**
-     * A class implementing javax.servlet.Filter for authenticating/filtering 
UI requests
+     * A class implementing javax.servlet.Filter for authenticating/filtering 
UI requests.
      */
     @isString
     public static final String UI_FILTER = "ui.filter";
 
     /**
-     * Initialization parameters for the javax.servlet.Filter
+     * Initialization parameters for the javax.servlet.Filter.
      */
     @isMapEntryType(keyType = String.class, valueType = String.class)
     public static final String UI_FILTER_PARAMS = "ui.filter.params";
 
     /**
-     * The size of the header buffer for the UI in bytes
+     * The size of the header buffer for the UI in bytes.
      */
     @isInteger
     @isPositiveNumber
@@ -555,7 +555,7 @@ public class DaemonConfig implements Validated {
     public static final String DRPC_HTTPS_NEED_CLIENT_AUTH = 
"drpc.https.need.client.auth";
 
     /**
-     * Class name for authorization plugin for DRPC client
+     * Class name for authorization plugin for DRPC client.
      */
     @isString
     public static final String DRPC_AUTHORIZER = "drpc.authorizer";
@@ -578,7 +578,7 @@ public class DaemonConfig implements Validated {
     public static final String DRPC_CHILDOPTS = "drpc.childopts";
 
     /**
-     * the metadata configured on the supervisor
+     * the metadata configured on the supervisor.
      */
     @isMapEntryType(keyType = String.class, valueType = String.class)
     public static final String SUPERVISOR_SCHEDULER_META = 
"supervisor.scheduler.meta";
@@ -590,7 +590,7 @@ public class DaemonConfig implements Validated {
      */
     @isNoDuplicateInList
     @NotNull
-    
@isListEntryCustom(entryValidatorClasses={ConfigValidation.IntegerValidator.class,ConfigValidation.PositiveNumberValidator.class})
+    @isListEntryCustom(entryValidatorClasses = 
{ConfigValidation.IntegerValidator.class,ConfigValidation.PositiveNumberValidator.class})
     public static final String SUPERVISOR_SLOTS_PORTS = 
"supervisor.slots.ports";
 
     /**
@@ -652,14 +652,14 @@ public class DaemonConfig implements Validated {
     public static final String NIMBUS_SLOTS_PER_TOPOLOGY = 
"nimbus.slots.perTopology";
 
     /**
-     * A class implementing javax.servlet.Filter for DRPC HTTP requests
+     * A class implementing javax.servlet.Filter for DRPC HTTP requests.
      */
     @isString
     public static final String DRPC_HTTP_FILTER = "drpc.http.filter";
 
     /**
      * Initialization parameters for the javax.servlet.Filter of the DRPC HTTP
-     * service
+     * service.
      */
     @isMapEntryType(keyType = String.class, valueType = String.class)
     public static final String DRPC_HTTP_FILTER_PARAMS = 
"drpc.http.filter.params";
@@ -679,7 +679,7 @@ public class DaemonConfig implements Validated {
     public static final String SUPERVISOR_CHILDOPTS = "supervisor.childopts";
 
     /**
-     * How many seconds to sleep for before shutting down threads on worker
+     * How many seconds to sleep for before shutting down threads on worker.
      */
     @isInteger
     @isPositiveNumber
@@ -739,7 +739,7 @@ public class DaemonConfig implements Validated {
     /**
      * The command launched supervisor with worker arguments
      * pid, action and [target_directory]
-     * Where action is - start profile, stop profile, jstack, heapdump and 
kill against pid
+     * Where action is - start profile, stop profile, jstack, heapdump and 
kill against pid.
      *
      */
     @isString
@@ -762,7 +762,7 @@ public class DaemonConfig implements Validated {
     public static final String 
STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = 
"storm.cluster.metrics.consumer.publish.interval.secs";
 
     /**
-     * Enables user-first classpath. See topology.classpath.beginning
+     * Enables user-first classpath. See topology.classpath.beginning.
      */
     @isBoolean
     public static final String 
STORM_TOPOLOGY_CLASSPATH_BEGINNING_ENABLED="storm.topology.classpath.beginning.enabled";
@@ -792,6 +792,38 @@ public class DaemonConfig implements Validated {
     public static final String ISOLATION_SCHEDULER_MACHINES = 
"isolation.scheduler.machines";
 
     /**
+     * For ArtifactoryConfigLoader, this can either be a reference to an 
individual file in Artifactory or to a directory.
+     * If it is a directory, the file with the largest lexographic name will 
be returned. Users need to add "artifactory+" to the beginning of
+     * the real URI to use ArtifactoryConfigLoader.
+     * For FileConfigLoader, this is the URI pointing to a file.
+     */
+    @isString
+    public static final String SCHEDULER_CONFIG_LOADER_URI = 
"scheduler.config.loader.uri";
+
+    /**
+     * It is the frequency at which the plugin will call out to artifactory 
instead of returning the most recently cached result.
+     * Currently it's only used in ArtifactoryConfigLoader.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String SCHEDULER_CONFIG_LOADER_POLLTIME_SECS = 
"scheduler.config.loader.polltime.secs";
+
+    /**
+     * It is the amount of time an http connection to the artifactory server 
will wait before timing out.
+     * Currently it's only used in ArtifactoryConfigLoader.
+     */
+    @isInteger
+    @isPositiveNumber
+    public static final String SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS = 
"scheduler.config.loader.timeout.secs";
+
+    /**
+     * It is the part of the uri, configurable in Artifactory, which 
represents the top of the directory tree.
+     * It's only used in ArtifactoryConfigLoader.
+     */
+    @isString
+    public static final String  
SCHEDULER_CONFIG_LOADER_ARTIFACTORY_BASE_DIRECTORY = 
"scheduler.config.loader.artifactory.base.directory";
+
+    /**
      * A map from the user name to the number of machines that should that 
user is allowed to use. Set storm.scheduler
      * to org.apache.storm.scheduler.multitenant.MultitenantScheduler
      */
@@ -806,14 +838,14 @@ public class DaemonConfig implements Validated {
     public static final String RESOURCE_AWARE_SCHEDULER_USER_POOLS = 
"resource.aware.scheduler.user.pools";
 
     /**
-     * The class that specifies the eviction strategy to use in 
ResourceAwareScheduler
+     * The class that specifies the eviction strategy to use in 
ResourceAwareScheduler.
      */
     @NotNull
     @isImplementationOfClass(implementsClass = IEvictionStrategy.class)
     public static final String RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY = 
"resource.aware.scheduler.eviction.strategy";
 
     /**
-     * the class that specifies the scheduling priority strategy to use in 
ResourceAwareScheduler
+     * the class that specifies the scheduling priority strategy to use in 
ResourceAwareScheduler.
      */
     @NotNull
     @isImplementationOfClass(implementsClass = 
ISchedulingPriorityStrategy.class)
@@ -842,7 +874,7 @@ public class DaemonConfig implements Validated {
     public static final String STORM_CGROUP_RESOURCES = 
"storm.cgroup.resources";
 
     /**
-     * name for the cgroup hierarchy
+     * name for the cgroup hierarchy.
      */
     @isString
     public static final String STORM_CGROUP_HIERARCHY_NAME = 
"storm.cgroup.hierarchy.name";
@@ -856,25 +888,25 @@ public class DaemonConfig implements Validated {
     public static final String STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE = 
"storm.resource.isolation.plugin.enable";
 
     /**
-     * root directory for cgoups
+     * root directory for cgoups.
      */
     @isString
     public static String STORM_SUPERVISOR_CGROUP_ROOTDIR = 
"storm.supervisor.cgroup.rootdir";
 
     /**
-     * the manually set memory limit (in MB) for each CGroup on supervisor node
+     * the manually set memory limit (in MB) for each CGroup on supervisor 
node.
      */
     @isPositiveNumber
     public static String STORM_WORKER_CGROUP_MEMORY_MB_LIMIT = 
"storm.worker.cgroup.memory.mb.limit";
 
     /**
-     * the manually set cpu share for each CGroup on supervisor node
+     * the manually set cpu share for each CGroup on supervisor node.
      */
     @isPositiveNumber
     public static String STORM_WORKER_CGROUP_CPU_LIMIT = 
"storm.worker.cgroup.cpu.limit";
 
     /**
-     * full path to cgexec command
+     * full path to cgexec command.
      */
     @isString
     public static String STORM_CGROUP_CGEXEC_CMD = "storm.cgroup.cgexec.cmd";

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
index 95cbf63..8bd9035 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/multitenant/MultitenantScheduler.java
@@ -20,44 +20,66 @@ package org.apache.storm.scheduler.multitenant;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.storm.DaemonConfig;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.storm.Config;
 import org.apache.storm.scheduler.Cluster;
 import org.apache.storm.scheduler.IScheduler;
 import org.apache.storm.scheduler.Topologies;
 import org.apache.storm.scheduler.TopologyDetails;
+import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
+import org.apache.storm.scheduler.utils.IConfigLoader;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MultitenantScheduler implements IScheduler {
   private static final Logger LOG = 
LoggerFactory.getLogger(MultitenantScheduler.class);
   @SuppressWarnings("rawtypes")
-  private Map _conf;
+  private Map conf;
+  protected IConfigLoader configLoader;
   
   @Override
-  public void prepare(@SuppressWarnings("rawtypes") Map<String, Object> conf) {
-    _conf = conf;
+  public void prepare(Map<String, Object> conf) {
+    this.conf = conf;
+    configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
+
   }
- 
+
+  /**
+   * Load from configLoaders first; if no config available, read from 
multitenant-scheduler.yaml;
+   * if no config available from multitenant-scheduler.yaml, get configs from 
conf. Only one will be used.
+   * @return User pool configs.
+   */
   private Map<String, Number> getUserConf() {
-    Map<String, Number> ret = (Map<String, 
Number>)_conf.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
-    if (ret == null) {
-      ret = new HashMap<>();
-    } else {
-      ret = new HashMap<>(ret);
+    Map<String, Number> ret;
+
+    // Try the loader plugin, if configured
+    if (configLoader != null) {
+      ret = (Map<String, Number>) 
configLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+      if (ret != null) {
+        return ret;
+      } else {
+        LOG.warn("Config loader returned null. Will try to read from 
multitenant-scheduler.yaml");
+      }
     }
 
+    // If that fails, fall back on the multitenant-scheduler.yaml file
     Map fromFile = Utils.findAndReadConfigFile("multitenant-scheduler.yaml", 
false);
-    Map<String, Number> tmp = (Map<String, 
Number>)fromFile.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
-    if (tmp != null) {
-      ret.putAll(tmp);
+    ret = (Map<String, 
Number>)fromFile.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+    if (ret != null) {
+      return ret;
+    } else {
+        LOG.warn("Reading from multitenant-scheduler.yaml returned null. This 
could because the file is not available. "
+                + "Will load configs from storm configuration");
     }
-    return ret;
-  }
 
+    // If that fails, use config
+    ret = (Map<String, Number>) 
conf.get(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+    if (ret == null) {
+      return new HashMap<>();
+    } else {
+      return ret;
+    }
+  }
 
   @Override
   public Map<String, Object> config() {

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
index 19838f4..669079b 100644
--- 
a/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/resource/ResourceAwareScheduler.java
@@ -20,7 +20,6 @@ package org.apache.storm.scheduler.resource;
 
 import java.util.HashMap;
 import java.util.Map;
-
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.scheduler.Cluster;
@@ -31,8 +30,10 @@ import org.apache.storm.scheduler.TopologyDetails;
 import 
org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
 import 
org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
 import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
+import org.apache.storm.scheduler.utils.ConfigLoaderFactoryService;
 import org.apache.storm.utils.ReflectionUtils;
 import org.apache.storm.utils.DisallowedStrategyException;
+import org.apache.storm.scheduler.utils.IConfigLoader;
 import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,14 +43,16 @@ public class ResourceAwareScheduler implements IScheduler {
     private Map<String, Object> conf;
     private ISchedulingPriorityStrategy schedulingPrioritystrategy;
     private IEvictionStrategy evictionStrategy;
+    private IConfigLoader configLoader;
 
     @Override
     public void prepare(Map<String, Object> conf) {
         this.conf = conf;
         schedulingPrioritystrategy = (ISchedulingPriorityStrategy) 
ReflectionUtils.newInstance(
-            (String) 
conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
+                (String) 
conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
         evictionStrategy = (IEvictionStrategy) ReflectionUtils.newInstance(
-            (String) 
conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
+                (String) 
conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
+        configLoader = ConfigLoaderFactoryService.createConfigLoader(conf);
     }
 
     @Override
@@ -75,7 +78,7 @@ public class ResourceAwareScheduler implements IScheduler {
                 td = 
schedulingPrioritystrategy.getNextTopologyToSchedule(cluster, userMap);
             } catch (Exception ex) {
                 LOG.error("Exception thrown when running priority strategy {}. 
No topologies will be scheduled!",
-                    schedulingPrioritystrategy.getClass().getName(), ex);
+                        schedulingPrioritystrategy.getClass().getName(), ex);
                 break;
             }
             if (td == null) {
@@ -104,8 +107,8 @@ public class ResourceAwareScheduler implements IScheduler {
         } catch (DisallowedStrategyException e) {
             topologySubmitter.markTopoUnsuccess(td);
             cluster.setStatus(td.getId(), "Unsuccessful in scheduling - " + 
e.getAttemptedClass()
-                              + " is not an allowed strategy. Please make sure 
your " + Config.TOPOLOGY_SCHEDULER_STRATEGY
-                              + " config is one of the allowed strategies: " + 
e.getAllowedStrategies().toString());
+                    + " is not an allowed strategy. Please make sure your " + 
Config.TOPOLOGY_SCHEDULER_STRATEGY
+                    + " config is one of the allowed strategies: " + 
e.getAllowedStrategies().toString());
             return;
         } catch (RuntimeException e) {
             LOG.error("failed to create instance of IStrategy: {} Topology {} 
will not be scheduled.",
@@ -115,7 +118,7 @@ public class ResourceAwareScheduler implements IScheduler {
                     + strategyConf + ". Please check logs for details");
             return;
         }
-       
+
         while (true) {
             // A copy of the cluster that restricts the strategy to only 
modify a single topology
             SingleTopologyCluster toSchedule = new 
SingleTopologyCluster(workingState, td.getId());
@@ -139,8 +142,8 @@ public class ResourceAwareScheduler implements IScheduler {
                         LOG.error("Unsuccessful attempting to assign executors 
to nodes.", ex);
                         topologySubmitter.markTopoUnsuccess(td);
                         cluster.setStatus(td.getId(), "Unsuccessful in 
scheduling - "
-                            + "IllegalStateException thrown when attempting to 
assign executors to nodes. Please check"
-                            + " log for details.");
+                                + "IllegalStateException thrown when 
attempting to assign executors to nodes. Please check"
+                                + " log for details.");
                     }
                     return;
                 } else {
@@ -151,8 +154,8 @@ public class ResourceAwareScheduler implements IScheduler {
                             madeSpace = evictionStrategy.makeSpaceForTopo(td, 
workingState, userMap);
                         } catch (Exception ex) {
                             LOG.error("Exception thrown when running eviction 
strategy {} to schedule topology {}."
-                                    + " No evictions will be done!", 
evictionStrategy.getClass().getName(),
-                                td.getName(), ex);
+                                            + " No evictions will be done!", 
evictionStrategy.getClass().getName(),
+                                    td.getName(), ex);
                             topologySubmitter.markTopoUnsuccess(td);
                             return;
                         }
@@ -160,7 +163,7 @@ public class ResourceAwareScheduler implements IScheduler {
                             LOG.debug("Could not make space for topo {} will 
move to attempted", td);
                             topologySubmitter.markTopoUnsuccess(td);
                             cluster.setStatus(td.getId(), "Not enough 
resources to schedule - "
-                                + result.getErrorMessage());
+                                    + result.getErrorMessage());
                             return;
                         }
                         continue;
@@ -171,7 +174,7 @@ public class ResourceAwareScheduler implements IScheduler {
                 }
             } else {
                 LOG.warn("Scheduling results returned from topology {} is not 
vaild! Topology with be ignored.",
-                    td.getName());
+                        td.getName());
                 topologySubmitter.markTopoUnsuccess(td, cluster);
                 return;
             }
@@ -202,19 +205,11 @@ public class ResourceAwareScheduler implements IScheduler 
{
         return userMap;
     }
 
-    /**
-     * Get resource guarantee configs.
-     *
-     * @return a map that contains resource guarantees of every user of the 
following format
-     *     {userid->{resourceType->amountGuaranteed}}
-     */
-    private Map<String, Map<String, Double>> getUserResourcePools() {
-        Map<String, Map<String, Number>> raw =
-            (Map<String, Map<String, Number>>) 
conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
-        Map<String, Map<String, Double>> ret = new HashMap<>();
+    private Map<String, Map<String, Double>> convertToDouble(Map<String, 
Map<String, Number>> raw) {
+        Map<String, Map<String, Double>> ret = new HashMap<String, Map<String, 
Double>>();
 
         if (raw != null) {
-            for (Map.Entry<String, Map<String, Number>> userPoolEntry :  
raw.entrySet()) {
+            for (Map.Entry<String, Map<String, Number>> userPoolEntry : 
raw.entrySet()) {
                 String user = userPoolEntry.getKey();
                 ret.put(user, new HashMap<String, Double>());
                 for (Map.Entry<String, Number> resourceEntry : 
userPoolEntry.getValue().entrySet()) {
@@ -223,18 +218,42 @@ public class ResourceAwareScheduler implements IScheduler 
{
             }
         }
 
-        Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", 
false);
-        Map<String, Map<String, Number>> tmp =
-            (Map<String, Map<String, Number>>) 
fromFile.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
-        if (tmp != null) {
-            for (Map.Entry<String, Map<String, Number>> userPoolEntry : 
tmp.entrySet()) {
-                String user = userPoolEntry.getKey();
-                ret.put(user, new HashMap<String, Double>());
-                for (Map.Entry<String, Number> resourceEntry : 
userPoolEntry.getValue().entrySet()) {
-                    ret.get(user).put(resourceEntry.getKey(), 
resourceEntry.getValue().doubleValue());
-                }
+        return ret;
+    }
+
+    /**
+     * Get resource guarantee configs.
+     * Load from configLoaders first; if no config available, read from 
user-resource-pools.yaml;
+     * if no config available from user-resource-pools.yaml, get configs from 
conf. Only one will be used.
+     * @return a map that contains resource guarantees of every user of the 
following format
+     * {userid->{resourceType->amountGuaranteed}}
+     */
+    private Map<String, Map<String, Double>> getUserResourcePools() {
+        Map<String, Map<String, Number>> raw;
+
+        // Try the loader plugin, if configured
+        if (configLoader != null) {
+            raw = (Map<String, Map<String, Number>>) 
configLoader.load(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+            if (raw != null) {
+                return convertToDouble(raw);
+            } else {
+                LOG.warn("Config loader returned null. Will try to read from 
user-resource-pools.yaml");
             }
         }
-        return ret;
+
+        // if no configs from loader, try to read from user-resource-pools.yaml
+        Map fromFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", 
false);
+        raw = (Map<String, Map<String, Number>>) 
fromFile.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+        if (raw != null) {
+            return convertToDouble(raw);
+        } else {
+            LOG.warn("Reading from user-resource-pools.yaml returned null. 
This could because the file is not available. "
+                    + "Will load configs from storm configuration");
+        }
+
+        // if no configs from user-resource-pools.yaml, get configs from conf
+        raw = (Map<String, Map<String, Number>>) 
conf.get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
+
+        return convertToDouble(raw);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
new file mode 100644
index 0000000..3afa903
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoader.java
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Map;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.ResponseHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.util.EntityUtils;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.SafeConstructor;
+
+/**
+ * A dynamic loader that can load scheduler configurations for user resource 
guarantees from Artifactory (an artifact repository manager).
+ */
+public class ArtifactoryConfigLoader implements IConfigLoader {
+    protected static final String LOCAL_ARTIFACT_DIR = "scheduler_artifacts";
+    static final String cacheFilename = "latest.yaml";
+    private static final String DEFAULT_ARTIFACTORY_BASE_DIRECTORY = 
"/artifactory";
+    private static final int DEFAULT_POLLTIME_SECS = 600;
+    private static final int DEFAULT_TIMEOUT_SECS = 10;
+    private static final String ARTIFACTORY_SCHEME_PREFIX = "artifactory+";
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ArtifactoryConfigLoader.class);
+
+    private Map<String, Object> conf;
+    private int artifactoryPollTimeSecs = DEFAULT_POLLTIME_SECS;
+    private boolean cacheInitialized = false;
+    // Location of the file in the artifactory archive.  Also used to name 
file in cache.
+    private String localCacheDir;
+    private String baseDirectory = DEFAULT_ARTIFACTORY_BASE_DIRECTORY;
+    private int lastReturnedTime = 0;
+    private int timeoutSeconds = DEFAULT_TIMEOUT_SECS;
+    private Map lastReturnedValue;
+    private URI targetURI = null;
+    private JSONParser jsonParser;
+    private String scheme;
+
+    public ArtifactoryConfigLoader(Map<String, Object> conf) {
+        this.conf = conf;
+        Integer thisTimeout = (Integer) 
conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS);
+        if (thisTimeout != null) {
+            timeoutSeconds = thisTimeout;
+        }
+        Integer thisPollTime = (Integer) 
conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_POLLTIME_SECS);
+        if (thisPollTime != null) {
+            artifactoryPollTimeSecs =thisPollTime;
+        }
+        String thisBase = (String) 
conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_ARTIFACTORY_BASE_DIRECTORY);
+        if (thisBase != null) {
+            baseDirectory = thisBase;
+        }
+        String uriString = (String) 
conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI);
+        if (uriString == null) {
+            LOG.error("No URI defined in {} configuration.", 
DaemonConfig.SCHEDULER_CONFIG_LOADER_URI);
+        } else {
+            try {
+                targetURI = new URI(uriString);
+                scheme = 
targetURI.getScheme().substring(ARTIFACTORY_SCHEME_PREFIX.length());
+            } catch(URISyntaxException e) {
+                LOG.error("Failed to parse uri={}", uriString);
+            }
+        }
+        jsonParser = new JSONParser();
+    }
+
+    /**
+     * Load the configs associated with the configKey from the targetURI.
+     * @param configKey The key from which we want to get the scheduler config.
+     * @return The scheduler configuration if exists; null otherwise.
+     */
+    @Override
+    public Map load(String configKey) {
+        if (targetURI == null) {
+            return null;
+        }
+
+        // Check for new file every so often
+        int currentTimeSecs = Time.currentTimeSecs();
+        if (lastReturnedValue != null && ((currentTimeSecs - lastReturnedTime) 
< artifactoryPollTimeSecs)) {
+            LOG.debug("currentTimeSecs: {}; lastReturnedTime {}; 
artifactoryPollTimeSecs: {}. Returning our last map.",
+                    currentTimeSecs, lastReturnedTime, 
artifactoryPollTimeSecs);
+            return (Map) lastReturnedValue.get(configKey);
+        }
+
+        try {
+            Map raw = loadFromURI(targetURI);
+            if (raw != null) {
+                return (Map) raw.get(configKey);
+            }
+        } catch (Exception e) {
+            LOG.error("Failed to load from uri {}", targetURI);
+        }
+        return null;
+    }
+
+
+    /**
+     * A private class used to check the response coming back from httpclient.
+     */
+    private static class GETStringResponseHandler implements 
ResponseHandler<String> {
+        private static GETStringResponseHandler singleton = null;
+
+        /**
+         * @return a singleton httpclient GET response handler
+         */
+        public static GETStringResponseHandler getInstance() {
+            if (singleton == null) {
+                singleton = new GETStringResponseHandler();
+            }
+            return singleton;
+        }
+
+        /**
+         * @param response The http response to verify.
+         * @return null on failure or the response string if return code is in 
200 range
+         */
+        @Override
+        public String handleResponse(final HttpResponse response) throws 
IOException {
+            int status = response.getStatusLine().getStatusCode();
+            HttpEntity entity = response.getEntity();
+            String entityString = (entity != null ? 
EntityUtils.toString(entity) : null);
+            if (status >= 200 && status < 300) {
+                return entityString;
+            } else {
+                LOG.error("Got unexpected response code {}; entity: {}", 
status, entityString);
+                return null;
+            }
+        }
+    }
+
+    /**
+     * @param api null if we are trying to download artifact, otherwise a 
string to call REST api,
+     *        e.g. "/api/storage"
+     * @param artifact location of artifact
+     * @param host Artifactory hostname
+     * @param port Artifactory port
+     * @return null on failure or the response string if return code is in 200 
range
+     *
+     * <p>Protected so we can override this in unit tests
+     */
+    protected String doGet(String api, String artifact, String host, Integer 
port) {
+        URIBuilder builder = new 
URIBuilder().setScheme(scheme).setHost(host).setPort(port);
+
+        String path = null;
+        if (api != null) {
+            path = baseDirectory + "/" + api + "/" + artifact;
+        } else {
+            path = baseDirectory + "/" + artifact;
+        }
+
+        // Get rid of multiple '/' in url
+        path = path.replaceAll("/[/]+", "/");
+        builder.setPath(path);
+
+        RequestConfig requestConfig = 
RequestConfig.custom().setConnectTimeout(timeoutSeconds * 1000).build();
+        HttpClient httpclient = 
HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();
+
+        String returnValue;
+        try {
+            LOG.debug("About to issue a GET to {}", builder);
+            HttpGet httpget = new HttpGet(builder.build());
+            String responseBody;
+            responseBody = httpclient.execute(httpget, 
GETStringResponseHandler.getInstance());
+            returnValue = responseBody;
+        } catch (Exception e) {
+            LOG.error("Received exception while connecting to Artifactory", e);
+            returnValue = null;
+        }
+
+        LOG.debug("Returning {}", returnValue);
+        return returnValue;
+    }
+
+    private JSONObject getArtifactMetadata(String location, String host, 
Integer port) {
+        String metadataStr = null;
+
+        metadataStr = doGet("/api/storage", location, host, port);
+
+        if (metadataStr == null) {
+            return null;
+        }
+
+        JSONObject returnValue;
+        try {
+            returnValue = (JSONObject) jsonParser.parse(metadataStr);
+        } catch (ParseException e) {
+            LOG.error("Could not parse JSON string {}", metadataStr, e);
+            return null;
+        }
+
+        return returnValue;
+    }
+
+    private class DirEntryCompare implements Comparator<JSONObject> {
+        @Override
+        public int compare(JSONObject o1, JSONObject o2) {
+            return ((String)o1.get("uri")).compareTo((String)o2.get("uri"));
+        }
+    }
+
+    private String loadMostRecentArtifact(String location, String host, 
Integer port) {
+        // Is this a directory or is it a file?
+        JSONObject json = getArtifactMetadata(location, host, port);
+        if (json == null) {
+            LOG.error("got null metadata");
+            return null;
+        }
+        String downloadURI = (String) json.get("downloadUri");
+
+        // This means we are pointing at a file.
+        if (downloadURI != null) {
+            // Then get it and return the file as string.
+            String returnValue = doGet(null, location, host, port);
+            saveInArtifactoryCache(returnValue);
+            return returnValue;
+        }
+
+        // This should mean that we were pointed at a directory.  
+        // Find the most recent child and load that.
+        JSONArray msg = (JSONArray) json.get("children");
+        if (msg == null || msg.size() == 0) {
+            LOG.error("Expected directory children not present");
+            return null;
+        }
+        JSONObject newest = (JSONObject) Collections.max(msg, new 
DirEntryCompare());
+        if (newest == null) {
+            LOG.error("Failed to find most recent artifact uri in {}", 
location);
+            return null;
+        }
+
+        String uri = (String) newest.get("uri");
+        if (uri == null) {
+            LOG.error("Expected directory uri not present");
+            return null;
+        }
+        String returnValue = doGet(null, location + uri, host, port);
+        saveInArtifactoryCache(returnValue);
+        return returnValue;
+    }
+
+    private void updateLastReturned(Map ret) {
+        lastReturnedTime = Time.currentTimeSecs();
+        lastReturnedValue = ret;
+    }
+
+    private Map loadFromFile(File file) {
+        Map ret = null;
+
+        try {
+            ret = (Map) Utils.readYamlFile(file.getCanonicalPath());
+        } catch (IOException e) {
+            LOG.error("Filed to load from file. Exception: {}", 
e.getMessage());
+        }
+
+        if (ret != null) {
+            try {
+                LOG.debug("returning a new map from file {}", 
file.getCanonicalPath());
+            } catch (java.io.IOException e) {
+                LOG.debug("Could not get PATH from file object in debug print. 
Ignoring");
+            }
+            return ret;
+        }
+
+        return null;
+    }
+
+    private Map getLatestFromCache() {
+        String localFileName = localCacheDir + File.separator + cacheFilename;
+        return loadFromFile(new File(localFileName));
+    }
+
+    private void saveInArtifactoryCache(String yamlData) {
+        if (yamlData == null) {
+            LOG.warn("Will not save null data into the artifactory cache");
+            return;
+        }
+
+        String localFileName = localCacheDir + File.separator + cacheFilename;
+
+        File cacheFile = new File(localFileName);
+        try (FileOutputStream fos = new FileOutputStream(cacheFile)) {
+            fos.write(yamlData.getBytes());
+            fos.flush();
+        } catch (IOException e) {
+            LOG.error("Received exception when writing file {}.  Attempting 
delete", localFileName, e);
+            try {
+                cacheFile.delete();
+            } catch (Exception deleteException) {
+                LOG.error("Received exception when deleting file {}.", 
localFileName, deleteException);
+            }
+        }
+    }
+
+    private void makeArtifactoryCache(String location) {
+        // Just make sure appropriate directories exist
+        String localDirName = (String) conf.get(Config.STORM_LOCAL_DIR);
+        if (localDirName == null) {
+            return;
+        }
+
+        // First make the cache dir
+        localDirName = localDirName + File.separator + "nimbus" + 
File.separator + LOCAL_ARTIFACT_DIR;
+        File dir = new File(localDirName);
+        if (! dir.exists()) {
+            dir.mkdirs();
+        }
+
+        localCacheDir = localDirName + File.separator + 
location.replaceAll(File.separator, "_");
+        dir = new File(localCacheDir);
+        if (! dir.exists()) {
+            dir.mkdir();
+        }
+        cacheInitialized = true;
+    }
+
+    private Map loadFromURI(URI uri) {
+        String host = uri.getHost();
+        Integer port = uri.getPort();
+        String location = uri.getPath();
+        if (location.toLowerCase().startsWith(baseDirectory.toLowerCase())) {
+            location = location.substring(baseDirectory.length());
+        }
+
+        if (!cacheInitialized) {
+            makeArtifactoryCache(location);
+        }
+
+        // Get the most recent artifact as a String, and then parse the yaml
+        String yamlConfig = loadMostRecentArtifact(location, host, port);
+
+        // If we failed to get anything from Artifactory try to get it from 
our local cache
+        if (yamlConfig == null) {
+            Map ret = getLatestFromCache();
+            updateLastReturned(ret);
+            return ret;
+        }
+
+        // Now parse it and return the map.
+        Yaml yaml = new Yaml(new SafeConstructor());
+        Map ret = null;
+        try {
+            ret = (Map) yaml.load(yamlConfig);
+        } catch (Exception e) {
+            LOG.error("Could not parse yaml.");
+            return null;
+        }
+
+        if (ret != null) {
+            LOG.debug("returning a new map from Artifactory");
+            updateLastReturned(ret);
+            return ret;
+        }
+
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java
new file mode 100644
index 0000000..6494ae2
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderFactory.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import com.google.auto.service.AutoService;
+import java.net.URI;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory class for ArtifactoryConfigLoader.
+ */
+@AutoService(IConfigLoaderFactory.class)
+public class ArtifactoryConfigLoaderFactory implements IConfigLoaderFactory {
+    private static final Logger LOG = 
LoggerFactory.getLogger(ArtifactoryConfigLoaderFactory.class);
+
+    /**
+     * Create a ArtifactoryConfigLoader if the scheme of the URI is 
"artifactory+http" or "artifactory+https"; otherwise return null.
+     * @param uri The URI which pointing to the config file/directory location 
on Artifactory server.
+     * @param conf The storm configuration.
+     * @return A ArtifactoryConfigLoader if the scheme is "artifactory+http" 
or "artifactory+https"; otherwise, null.
+     */
+    @Override
+    public IConfigLoader createIfSupported(URI uri, Map<String, Object> conf) {
+        String scheme = uri.getScheme();
+        if ("artifactory+http".equalsIgnoreCase(scheme) || 
"artifactory+https".equalsIgnoreCase(scheme)) {
+            return new ArtifactoryConfigLoader(conf);
+        } else {
+            LOG.debug("scheme {} not supported in this factory.", scheme);
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java
new file mode 100644
index 0000000..53df78b
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/ConfigLoaderFactoryService.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.storm.DaemonConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The user interface to create a concrete IConfigLoader instance for use.
+ */
+public class ConfigLoaderFactoryService {
+
+    private static Logger LOG = 
LoggerFactory.getLogger(IConfigLoaderFactory.class);
+
+    /**
+     * SerivceLoader loads all the implementations of IConfigLoaderFactory for 
use.
+     */
+    private static ServiceLoader<IConfigLoaderFactory> serviceLoader = 
ServiceLoader.load(IConfigLoaderFactory.class);
+
+    /**
+     * The user interface to create an IConfigLoader instance.
+     * It iterates all the implementations of IConfigLoaderFactory and finds 
the one which supports the
+     * specific scheme of the URI and then uses it to create an IConfigLoader 
instance.
+     * @param conf The storm configuration.
+     * @return A concrete IConfigLoader implementation which supports the 
scheme of the URI.
+     *         If multiple implementations are available, return the first 
one; otherwise, return null.
+     */
+    public static IConfigLoader createConfigLoader(Map<String, Object> conf) {
+        String uriString = (String) 
conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI);
+        if (null != uriString) {
+            try {
+                URI uri = new URI(uriString);
+                for (IConfigLoaderFactory factory : serviceLoader) {
+                    IConfigLoader ret = factory.createIfSupported(uri, conf);
+                    if (ret != null) {
+                        return ret;
+                    }
+                }
+            } catch (URISyntaxException e) {
+                LOG.error("Failed to parse uri={}", uriString);
+            }
+        } else {
+            LOG.debug("Config {} is not set.", 
DaemonConfig.SCHEDULER_CONFIG_LOADER_URI);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java
new file mode 100644
index 0000000..e785e5c
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoader.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scheduler configuration loader which loads configs from a file.
+ */
+public class FileConfigLoader implements IConfigLoader {
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileConfigLoader.class);
+
+    private Map<String, Object> conf;
+    private String targetFilePath = null;
+
+    public FileConfigLoader(Map<String, Object> conf) {
+        this.conf = conf;
+        String uriString = (String) 
conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI);
+        if (uriString == null) {
+            LOG.error("No URI defined in {} configuration.", 
DaemonConfig.SCHEDULER_CONFIG_LOADER_URI);
+        } else {
+            try {
+                targetFilePath = new URI(uriString).getPath();
+            } catch (URISyntaxException e) {
+                LOG.error("Failed to parse uri={}", uriString);
+            }
+        }
+    }
+
+    /**
+     * Load the configs associated with the configKey from the targetFilePath.
+     * @param configKey The key from which we want to get the scheduler config.
+     * @return The scheduler configuration if exists; null otherwise.
+     */
+    @Override
+    public Map load(String configKey) {
+        if (targetFilePath != null) {
+            try {
+                Map raw = (Map) Utils.readYamlFile(targetFilePath);
+                if (raw != null) {
+                    return (Map) raw.get(configKey);
+                }
+            } catch (Exception e) {
+                LOG.error("Failed to load from file {}", targetFilePath);
+            }
+        }
+        return null;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java
new file mode 100644
index 0000000..9a2ac5c
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/FileConfigLoaderFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import com.google.auto.service.AutoService;
+import java.net.URI;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory class for FileConfigLoader.
+ */
+@AutoService(IConfigLoaderFactory.class)
+public class FileConfigLoaderFactory implements IConfigLoaderFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileConfigLoaderFactory.class);
+
+    /**
+     * Create a FileConfigLoader if the scheme of the URI is "file"; else 
return null.
+     * @param uri The URI which pointing to the config file location.
+     * @param conf The storm configuration.
+     * @return A FileConfigLoader if the scheme is "file"; otherwise, null.
+     */
+    @Override
+    public IConfigLoader createIfSupported(URI uri, Map<String, Object> conf) {
+        String scheme = uri.getScheme();
+        if ("file".equalsIgnoreCase(scheme)) {
+            return new FileConfigLoader(conf);
+        } else {
+            LOG.debug("scheme {} not supported in this factory.", scheme);
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java
new file mode 100644
index 0000000..5071133
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoader.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.util.Map;
+
+public interface IConfigLoader {
+
+    /**
+     * Load scheduler configs associated with the configKey.
+     * @param configKey The key from which we want to get the scheduler config.
+     * @return The scheduler configs
+     */
+    Map<?,?> load(String configKey);
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java
 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java
new file mode 100644
index 0000000..a039870
--- /dev/null
+++ 
b/storm-server/src/main/java/org/apache/storm/scheduler/utils/IConfigLoaderFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import java.net.URI;
+import java.util.Map;
+
+public interface IConfigLoaderFactory {
+
+    /**
+     * Create an IConfigLoader implementation if the scheme of the URI is 
supported; otherwise returns null.
+     *
+     * @param uri  The URI of the config location.
+     * @param conf The storm configuration.
+     * @return An concrete implementation if the scheme is supported, or null 
if not.
+     */
+    IConfigLoader createIfSupported(URI uri, Map<String, Object> conf);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/test/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderTest.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderTest.java
new file mode 100644
index 0000000..009f479
--- /dev/null
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/utils/ArtifactoryConfigLoaderTest.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.utils.Time;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.After;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ArtifactoryConfigLoaderTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ArtifactoryConfigLoaderTest.class);
+    private Path tmpDirPath;
+    private static final String ARTIFACTORY_HTTP_SCHEME_PREFIX = 
"artifactory+http://";;
+
+    private class ArtifactoryConfigLoaderMock extends ArtifactoryConfigLoader {
+        String getData;
+        HashMap<String, String> getDataMap = new HashMap<String, String>();
+
+        public ArtifactoryConfigLoaderMock(Map conf) {
+            super(conf);
+        }
+
+        public void setData(String api, String artifact, String data) {
+            if (api == null) {
+                getData = data;
+            }  else {
+                getDataMap.put(artifact, data);
+            }
+        }
+
+        @Override
+        protected String doGet(String api, String artifact, String host, 
Integer port) {
+            if (api == null) {
+                return getData;
+            }
+            return getDataMap.get(artifact);
+        }
+    };
+
+    @Before
+    public void createTempDir() throws Exception {
+        tmpDirPath = Files.createTempDirectory("TestArtifactoryConfigLoader");
+        File f = tmpDirPath.toFile();
+        f.mkdir();
+        File dir = new File(f, "nimbus");
+        dir.mkdir();
+    }
+
+    @After
+    public void removeTempDir() throws Exception {
+        FileUtils.deleteDirectory(tmpDirPath.toFile());
+    }
+
+    @Test
+    public void testInvalidConfig() {
+        Config conf = new Config();
+        ArtifactoryConfigLoaderMock loaderMock = new 
ArtifactoryConfigLoaderMock(conf);
+        Map ret = 
loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+        Assert.assertNull("Unexpectedly returned not null", ret);
+    }
+
+    @Test
+    public void testPointingAtDirectory() {
+        // This is a test where we are configured to point right at an 
artifact dir
+        Config conf = new Config();
+        conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, 
ARTIFACTORY_HTTP_SCHEME_PREFIX + 
"bogushost.yahoo.com:9999/location/of/this/dir");
+        conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString());
+
+        ArtifactoryConfigLoaderMock loaderMock = new 
ArtifactoryConfigLoaderMock(conf);
+
+        loaderMock.setData("Anything", "/location/of/this/dir",
+                "{\"children\" : [ { \"uri\" : \"/20160621204337.yaml\", 
\"folder\" : false }]}" );
+        loaderMock.setData(null, null, "{ \"" + 
DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS + "\": {one: 1, two: 2, three: 3, 
four : 4}}");
+
+        Map ret = 
loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+        Assert.assertNotNull("Unexpectedly returned null", ret);
+        Assert.assertEquals(1, ret.get("one"));
+        Assert.assertEquals(2, ret.get("two"));
+        Assert.assertEquals(3, ret.get("three"));
+        Assert.assertEquals(4, ret.get("four"));
+
+        // Now let's load w/o setting up gets and we should still get valid 
map back
+        ArtifactoryConfigLoaderMock tc2 = new 
ArtifactoryConfigLoaderMock(conf);
+
+        Map ret2 = tc2.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+        Assert.assertNotNull("Unexpectedly returned null", ret2);
+        Assert.assertEquals(1, ret2.get("one"));
+        Assert.assertEquals(2, ret2.get("two"));
+        Assert.assertEquals(3, ret2.get("three"));
+        Assert.assertEquals(4, ret2.get("four"));
+    }
+
+    @Test
+    public void testArtifactUpdate() {
+        // This is a test where we are configured to point right at an 
artifact dir
+        Config conf = new Config();
+        conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI,  
ARTIFACTORY_HTTP_SCHEME_PREFIX + 
"bogushost.yahoo.com:9999/location/of/test/dir");
+        conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString());
+
+        Time.startSimulating();
+
+        try {
+            ArtifactoryConfigLoaderMock loaderMock = new 
ArtifactoryConfigLoaderMock(conf);
+
+            loaderMock.setData("Anything", "/location/of/test/dir",
+                    "{\"children\" : [ { \"uri\" : \"/20160621204337.yaml\", 
\"folder\" : false }]}" );
+            loaderMock.setData(null, null, "{ \"" + 
DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS + "\": {one: 1, two: 2, three: 
3}}");
+            Map ret = 
loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+
+            Assert.assertNotNull("Unexpectedly returned null", ret);
+            Assert.assertEquals(1, ret.get("one"));
+            Assert.assertEquals(2, ret.get("two"));
+            Assert.assertEquals(3, ret.get("three"));
+            Assert.assertNull("Unexpectedly contained \"four\"", 
ret.get("four"));
+
+            // Now let's load w/o setting up gets and we should still get 
valid map back
+            ArtifactoryConfigLoaderMock tc2 = new 
ArtifactoryConfigLoaderMock(conf);
+            Map ret2 = tc2.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+            Assert.assertNotNull("Unexpectedly returned null", ret2);
+            Assert.assertEquals(1, ret2.get("one"));
+            Assert.assertEquals(2, ret2.get("two"));
+            Assert.assertEquals(3, ret2.get("three"));
+            Assert.assertNull("Unexpectedly did not return null", 
ret2.get("four"));
+
+            // Now let's update it, but not advance time.  Should get old map 
again.
+            loaderMock.setData("Anything", "/location/of/test/dir",
+                    "{\"children\" : [ { \"uri\" : \"/20160621204999.yaml\", 
\"folder\" : false }]}");
+            loaderMock.setData(null, null, "{ \"" + 
DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS + "\": {one: 1, two: 2, three: 3, 
four : 4}}");
+            ret = 
loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+            Assert.assertNotNull("Unexpectedly returned null", ret);
+            Assert.assertEquals(1, ret.get("one"));
+            Assert.assertEquals(2, ret.get("two"));
+            Assert.assertEquals(3, ret.get("three"));
+            Assert.assertNull("Unexpectedly did not return null, not enough 
time passed!", ret.get("four"));
+
+            // Re-load from cached' file.
+            ret2 = tc2.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+            Assert.assertNotNull("Unexpectedly returned null", ret2);
+            Assert.assertEquals(1, ret2.get("one"));
+            Assert.assertEquals(2, ret2.get("two"));
+            Assert.assertEquals(3, ret2.get("three"));
+            Assert.assertNull("Unexpectedly did not return null, last cached 
result should not have \"four\"", ret2.get("four"));
+
+            // Now, let's advance time.
+            Time.advanceTime(11*60*1000);
+
+            ret = 
loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+            Assert.assertNotNull("Unexpectedly returned null", ret);
+            Assert.assertEquals(1, ret.get("one"));
+            Assert.assertEquals(2, ret.get("two"));
+            Assert.assertEquals(3, ret.get("three"));
+            Assert.assertEquals(4, ret.get("four"));
+
+            // Re-load from cached' file.
+            ret2 = tc2.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+            Assert.assertNotNull("Unexpectedly returned null", ret2);
+            Assert.assertEquals(1, ret2.get("one"));
+            Assert.assertEquals(2, ret2.get("two"));
+            Assert.assertEquals(3, ret2.get("three"));
+            Assert.assertEquals(4, ret2.get("four"));
+        } finally {
+            Time.stopSimulating();
+        }
+    }
+
+    @Test
+    public void testPointingAtSpecificArtifact() {
+        // This is a test where we are configured to point right at a single 
artifact
+        Config conf = new Config();
+        conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, 
ARTIFACTORY_HTTP_SCHEME_PREFIX + 
"bogushost.yahoo.com:9999/location/of/this/artifact");
+        conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString());
+
+        ArtifactoryConfigLoaderMock loaderMock = new 
ArtifactoryConfigLoaderMock(conf);
+
+        loaderMock.setData("Anything", "/location/of/this/artifact", "{ 
\"downloadUri\": \"anything\"}");
+        loaderMock.setData(null, null, "{ \"" + 
DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS + "\": {one: 1, two: 2, three: 
3}}");
+        Map ret = 
loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+
+        Assert.assertNotNull("Unexpectedly returned null", ret);
+        Assert.assertEquals(1, ret.get("one"));
+        Assert.assertEquals(2, ret.get("two"));
+        Assert.assertEquals(3, ret.get("three"));
+
+        // Now let's load w/o setting up gets and we should still get valid 
map back
+        ArtifactoryConfigLoaderMock tc2 = new 
ArtifactoryConfigLoaderMock(conf);
+        Map ret2 = tc2.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+        Assert.assertNotNull("Unexpectedly returned null", ret2);
+        Assert.assertEquals(1, ret2.get("one"));
+        Assert.assertEquals(2, ret2.get("two"));
+        Assert.assertEquals(3, ret2.get("three"));
+    }
+
+    @Test
+    public void testMalformedYaml() throws Exception {
+        // This is a test where we are configured to point right at a single 
artifact
+        Config conf = new Config();
+        conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, 
ARTIFACTORY_HTTP_SCHEME_PREFIX + 
"bogushost.yahoo.com:9999/location/of/this/artifact");
+        conf.put(Config.STORM_LOCAL_DIR, tmpDirPath.toString());
+
+        ArtifactoryConfigLoaderMock loaderMock = new 
ArtifactoryConfigLoaderMock(conf);
+        loaderMock.setData("Anything", "/location/of/this/artifact", "{ 
\"downloadUri\": \"anything\"}");
+        loaderMock.setData(null, null, "ThisIsNotValidYaml");
+
+        Map ret = 
loaderMock.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+        Assert.assertNull("Unexpectedly returned a map", ret);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/f1f95449/storm-server/src/test/java/org/apache/storm/scheduler/utils/FileConfigLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/storm-server/src/test/java/org/apache/storm/scheduler/utils/FileConfigLoaderTest.java
 
b/storm-server/src/test/java/org/apache/storm/scheduler/utils/FileConfigLoaderTest.java
new file mode 100644
index 0000000..33df1a1
--- /dev/null
+++ 
b/storm-server/src/test/java/org/apache/storm/scheduler/utils/FileConfigLoaderTest.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.scheduler.utils;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.utils.Time;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileWriter;
+import java.util.HashMap;
+import java.util.Map;
+import org.yaml.snakeyaml.Yaml;
+
+public class FileConfigLoaderTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileConfigLoaderTest.class);
+
+    private static final String FILE_SCHEME_PREFIX = "file://";
+
+    @Test
+    public void testFileNotThere() {
+        Config conf = new Config();
+        conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, FILE_SCHEME_PREFIX 
+ "/file/not/exist/");
+        FileConfigLoader testLoader = new FileConfigLoader(conf);
+        Map result = 
testLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+        Assert.assertNull("Unexpectedly returned a map", result);
+    }
+
+    @Test
+    public void testInvalidConfig() throws Exception {
+        Config conf = new Config();
+        FileConfigLoader testLoader = new FileConfigLoader(conf);
+        Map result = 
testLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+        Assert.assertNull("Unexpectedly returned a map", result);
+    }
+
+    @Test
+    public void testMalformedYaml() throws Exception {
+
+        File temp = File.createTempFile("FileLoader", ".yaml");
+        temp.deleteOnExit();
+
+        FileWriter fw = new FileWriter(temp);
+        String outputData = "ThisIsNotValidYaml";
+        fw.write(outputData, 0, outputData.length());
+        fw.flush();
+        fw.close();
+
+        Config conf = new Config();
+        conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, FILE_SCHEME_PREFIX 
+ temp.getCanonicalPath());
+
+        FileConfigLoader testLoader = new FileConfigLoader(conf);
+        Map result = 
testLoader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+        Assert.assertNull("Unexpectedly returned a map", result);
+    }
+
+    @Test
+    public void testValidFile() throws Exception {
+
+        File temp = File.createTempFile("FileLoader", ".yaml");
+        temp.deleteOnExit();
+
+        Map<String, Integer> testMap = new HashMap<String, Integer>();
+        testMap.put("a", 1);
+        testMap.put("b", 2);
+        testMap.put("c", 3);
+        testMap.put("d", 4);
+        testMap.put("e", 5);
+
+        Map<String, Map<String, Integer>> confMap = new HashMap<>();
+        confMap.put(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS, testMap);
+
+        Yaml yaml = new Yaml();
+        FileWriter fw = new FileWriter(temp);
+        yaml.dump(confMap, fw);
+        fw.flush();
+        fw.close();
+
+        Config conf = new Config();
+        conf.put(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI, FILE_SCHEME_PREFIX 
+ temp.getCanonicalPath());
+        FileConfigLoader loader = new FileConfigLoader(conf);
+
+        Map result = 
loader.load(DaemonConfig.MULTITENANT_SCHEDULER_USER_POOLS);
+
+        Assert.assertNotNull("Unexpectedly returned null", result);
+
+        Assert.assertEquals("Maps are a different size", 
testMap.keySet().size(), result.keySet().size());
+
+        for (String key : testMap.keySet() ) {
+            Integer expectedValue = testMap.get(key);
+            Integer returnedValue = (Integer) result.get(key);
+            Assert.assertEquals("Bad value for key=" + key, expectedValue, 
returnedValue);
+        }
+    }
+}
\ No newline at end of file

Reply via email to