Repository: storm Updated Branches: refs/heads/master 6c28dce02 -> 16c3b4f4c
STORM-188. Allow user to specifiy full configuration path when running storm command. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a2c4129 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a2c4129 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a2c4129 Branch: refs/heads/master Commit: 9a2c4129b145b901f320238ea36a282ff7f44d0c Parents: 330e135 Author: Sriharsha Chintalapani <[email protected]> Authored: Tue Mar 31 11:37:30 2015 -0700 Committer: Sriharsha Chintalapani <[email protected]> Committed: Tue Mar 31 11:37:30 2015 -0700 ---------------------------------------------------------------------- .../src/jvm/backtype/storm/utils/Utils.java | 119 ++++++++++++------- 1 file changed, 77 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9a2c4129/storm-core/src/jvm/backtype/storm/utils/Utils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java index 4123f73..7e2d97b 100644 --- a/storm-core/src/jvm/backtype/storm/utils/Utils.java +++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java @@ -45,6 +45,8 @@ import java.net.URLDecoder; import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.WritableByteChannel; +import java.io.File; +import java.io.FileInputStream; import java.util.*; public class Utils { @@ -66,7 +68,7 @@ public class Utils { throw new RuntimeException(e); } } - + public static byte[] serialize(Object obj) { return serializationDelegate.serialize(obj); } @@ -120,7 +122,7 @@ public class Utils { throw new RuntimeException(e); } } - + public static List<URL> findResources(String name) { try { Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name); @@ -135,35 +137,68 @@ public class Utils { } public static Map findAndReadConfigFile(String name, boolean mustExist) { + InputStream in = null; + Boolean confFileEmpty = false; try { - HashSet<URL> resources = new HashSet<URL>(findResources(name)); - if(resources.isEmpty()) { - if(mustExist) throw new RuntimeException("Could not find config file on classpath " + name); - else return new HashMap(); - } - if(resources.size() > 1) { - throw new RuntimeException("Found multiple " + name + " resources. You're probably bundling the Storm jars with your topology jar. " - + resources); - } - URL resource = resources.iterator().next(); - Yaml yaml = new Yaml(new SafeConstructor()); - Map ret = null; - InputStream input = resource.openStream(); - try { - ret = (Map) yaml.load(new InputStreamReader(input)); - } finally { - input.close(); + in = getConfigFileInputStream(name); + if (null != in) { + Yaml yaml = new Yaml(new SafeConstructor()); + Map ret = (Map) yaml.load(new InputStreamReader(in)); + if (null != ret) { + return new HashMap(ret); + } else { + confFileEmpty = true; + } } - if(ret==null) ret = new HashMap(); - - return new HashMap(ret); - + if (mustExist) { + if(confFileEmpty) + throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs"); + else + throw new RuntimeException("Could not find config file on classpath " + name); + } else { + return new HashMap(); + } } catch (IOException e) { throw new RuntimeException(e); + } finally { + if (null != in) { + try { + in.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } } } + private static InputStream getConfigFileInputStream(String configFilePath) + throws IOException { + if (null == configFilePath) { + throw new IOException( + "Could not find config file, name not specified"); + } + + HashSet<URL> resources = new HashSet<URL>(findResources(configFilePath)); + if (resources.isEmpty()) { + File configFile = new File(configFilePath); + if (configFile.exists()) { + return new FileInputStream(configFile); + } + } else if (resources.size() > 1) { + throw new IOException( + "Found multiple " + configFilePath + + " resources. You're probably bundling the Storm jars with your topology jar. " + + resources); + } else { + LOG.info("Using "+configFilePath+" from resources"); + URL resource = resources.iterator().next(); + return resource.openStream(); + } + return null; + } + + public static Map findAndReadConfigFile(String name) { return findAndReadConfigFile(name, true); } @@ -171,7 +206,7 @@ public class Utils { public static Map readDefaultConfig() { return findAndReadConfigFile("defaults.yaml", true); } - + public static Map readCommandLineOpts() { Map ret = new HashMap(); String commandOptions = System.getProperty("storm.options"); @@ -205,7 +240,7 @@ public class Utils { ret.putAll(readCommandLineOpts()); return ret; } - + private static Object normalizeConf(Object conf) { if(conf==null) return new HashMap(); if(conf instanceof Map) { @@ -230,7 +265,7 @@ public class Utils { return conf; } } - + public static boolean isValidConf(Map<String, Object> stormConf) { return normalizeConf(stormConf).equals(normalizeConf((Map) JSONValue.parse(JSONValue.toJSONString(stormConf)))); } @@ -252,7 +287,7 @@ public class Utils { } return ret; } - + public static List<Object> tuple(Object... values) { List<Object> ret = new ArrayList<Object>(); for(Object v: values) { @@ -272,20 +307,20 @@ public class Utils { } out.close(); } - + public static IFn loadClojureFn(String namespace, String name) { try { clojure.lang.Compiler.eval(RT.readString("(require '" + namespace + ")")); } catch (Exception e) { //if playing from the repl and defining functions, file won't exist } - return (IFn) RT.var(namespace, name).deref(); + return (IFn) RT.var(namespace, name).deref(); } - + public static boolean isSystemId(String id) { return id.startsWith("__"); } - + public static <K, V> Map<V, K> reverseMap(Map<K, V> map) { Map<V, K> ret = new HashMap<V, K>(); for(K key: map.keySet()) { @@ -293,7 +328,7 @@ public class Utils { } return ret; } - + public static ComponentCommon getComponentCommon(StormTopology topology, String id) { if(topology.get_spouts().containsKey(id)) { return topology.get_spouts().get(id).get_common(); @@ -306,7 +341,7 @@ public class Utils { } throw new IllegalArgumentException("Could not find component with id " + id); } - + public static Integer getInt(Object o) { Integer result = getInt(o, null); if (null == result) { @@ -314,7 +349,7 @@ public class Utils { } return result; } - + public static Integer getInt(Object o, Integer defaultValue) { if (null == o) { return defaultValue; @@ -340,18 +375,18 @@ public class Utils { if (null == o) { return defaultValue; } - + if(o instanceof Boolean) { return (Boolean) o; } else { throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean"); } } - + public static long secureRandomLong() { return UUID.randomUUID().getLeastSignificantBits(); } - + public static CuratorFramework newCurator(Map conf, List<String> servers, Object port, String root) { return newCurator(conf, servers, port, root, null); } @@ -365,7 +400,7 @@ public class Utils { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); setupBuilder(builder, zkStr, conf, auth); - + return builder.build(); } @@ -398,8 +433,8 @@ public class Utils { CuratorFramework ret = newCurator(conf, servers, port, auth); ret.start(); return ret; - } - + } + /** * (defn integer-divided [sum num-pieces] @@ -412,9 +447,9 @@ public class Utils { ))) * @param sum * @param numPieces - * @return + * @return */ - + public static TreeMap<Integer, Integer> integerDivided(int sum, int numPieces) { int base = sum / numPieces; int numInc = sum % numPieces;
