[ https://issues.apache.org/jira/browse/STORM-1226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15137456#comment-15137456 ]
ASF GitHub Bot commented on STORM-1226: --------------------------------------- Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/1074#discussion_r52207624 --- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java --- @@ -1370,9 +1469,974 @@ public static int toPositive(int number) { public static RuntimeException wrapInRuntime(Exception e){ if (e instanceof RuntimeException){ return (RuntimeException)e; - }else { + } else { return new RuntimeException(e); } } -} + /** + * Determines if a zip archive contains a particular directory. + * + * @param zipfile path to the zipped file + * @param target directory being looked for in the zip. + * @return boolean whether or not the directory exists in the zip. + */ + public static boolean zipDoesContainDir(String zipfile, String target) throws IOException { + List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new ZipFile(zipfile).entries()); + + if(entries == null) { + return false; + } + + String targetDir = target + "/"; + for(ZipEntry entry : entries) { + String name = entry.getName(); + if(name.startsWith(targetDir)) { + return true; + } + } + + return false; + } + + /** + * Joins any number of maps together into a single map, combining their values into + * a list, maintaining values in the order the maps were passed in. Nulls are inserted + * for given keys when the map does not contain that key. + * + * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' => 5}) -> + * {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null, 5]} + * + * @param maps variable number of maps to join - order affects order of values in output. + * @return combined map + */ + public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) { + Map<K, List<V>> ret = new HashMap<>(); + + Set<K> keys = new HashSet<>(); + + for(Map<K, V> map : maps) { + keys.addAll(map.keySet()); + } + + for(Map<K, V> m : maps) { + for(K key : keys) { + V value = m.get(key); + + if(!ret.containsKey(key)) { + ret.put(key, new ArrayList<V>()); + } + + List<V> targetList = ret.get(key); + targetList.add(value); + } + } + return ret; + } + + /** + * Fills up chunks out of a collection (given a maximum amount of chunks) + * + * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]] + * partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]] + * partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]] + * @param maxNumChunks the maximum number of chunks to return + * @param coll the collection to be chunked up + * @return a list of the chunks, which are themselves lists. + */ + public static <T> List<List<T>> partitionFixed(int maxNumChunks, Collection<T> coll) { + List<List<T>> ret = new ArrayList<>(); + + if(maxNumChunks == 0 || coll == null) { + return ret; + } + + Map<Integer, Integer> parts = integerDivided(coll.size(), maxNumChunks); + + // Keys sorted in descending order + List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet()); + Collections.sort(sortedKeys, Collections.reverseOrder()); + + + Iterator<T> it = coll.iterator(); + for(Integer chunkSize : sortedKeys) { + if(!it.hasNext()) { break; } + Integer times = parts.get(chunkSize); + for(int i = 0; i < times; i++) { + if(!it.hasNext()) { break; } + List<T> chunkList = new ArrayList<>(); + for(int j = 0; j < chunkSize; j++) { + if(!it.hasNext()) { break; } + chunkList.add(it.next()); + } + ret.add(chunkList); + } + } + + return ret; + } + + /** + * Return a new instance of a pluggable specified in the conf. + * @param conf The conf to read from. + * @param configKey The key pointing to the pluggable class + * @return an instance of the class or null if it is not specified. + */ + public static Object getConfiguredClass(Map conf, Object configKey) { + if (conf.containsKey(configKey)) { + return newInstance((String)conf.get(configKey)); + } + return null; + } + + public static String logsFilename(String stormId, int port) { + return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) + FILE_PATH_SEPARATOR + "worker.log"; + } + + public static String eventLogsFilename(String stormId, int port) { + return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) + FILE_PATH_SEPARATOR + "events.log"; + } + + public static Object readYamlFile(String yamlFile) { + try (FileReader reader = new FileReader(yamlFile)) { + return new Yaml(new SafeConstructor()).load(reader); + } + catch(Exception ex) { + LOG.error("Failed to read yaml file.", ex); + } + return null; + } + + public static void setupDefaultUncaughtExceptionHandler() { + Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread thread, Throwable thrown) { + try { + handleUncaughtException(thrown); + } + catch (Error err) { + LOG.error("Received error in main thread.. terminating server...", err); + Runtime.getRuntime().exit(-2); + } + } + }); + } + + /** + * Creates a new map with a string value in the map replaced with an + * equivalently-lengthed string of '#'. + * @param m The map that a value will be redacted from + * @param key The key pointing to the value to be redacted + * @return a new map with the value redacted. The original map will not be modified. + */ + public static Map redactValue(Map<Object, String> m, Object key) { + if(m.containsKey(key)) { + HashMap<Object, String> newMap = new HashMap<>(m); + String value = newMap.get(key); + String redacted = new String(new char[value.length()]).replace("\0", "#"); + newMap.put(key, redacted); + return newMap; + } + return m; + } + + public static void logThriftAccess(Integer requestId, InetAddress remoteAddress, Principal principal, String operation) { + new ThriftAccessLogger().log( + String.format("Request ID: {} access from: {} principal: {} operation: {}", + requestId, remoteAddress, principal, operation)); + } + + /** + * Make sure a given key name is valid for the storm config. + * Throw RuntimeException if the key isn't valid. + * @param name The name of the config key to check. + */ + public static void validateKeyName(String name) { + Set<String> disallowedKeys = new HashSet<>(); + disallowedKeys.add("/"); + disallowedKeys.add("."); + disallowedKeys.add(":"); + disallowedKeys.add("\\"); + + for(String key : disallowedKeys) { + if( name.contains(key) ) { + throw new RuntimeException("Key name cannot contain any of the following: " + disallowedKeys.toString()); + } + } + if(name.trim().isEmpty()) { + throw new RuntimeException("Key name cannot be blank"); + } + } + + //Everything from here on is translated from the old util.clj (storm-core/src/clj/backtype.storm/util.clj) + + public static final boolean IS_ON_WINDOWS = "Windows_NT".equals(System.getenv("OS")); + + public static final String FILE_PATH_SEPARATOR = System.getProperty("file.separator"); + + public static final String CLASS_PATH_SEPARATOR = System.getProperty("path.separator"); + + public static final int SIGKILL = 9; + public static final int SIGTERM = 15; + + + + /** + * Find the first item of coll for which pred.test(...) returns true. + * @param pred The IPredicate to test for + * @param coll The Collection of items to search through. + * @return The first matching value in coll, or null if nothing matches. + */ + public static Object findFirst (IPredicate pred, Collection coll) { + if (coll == null || pred == null) { + return null; + } else { + Iterator<Object> iter = coll.iterator(); + while(iter != null && iter.hasNext()) { + Object obj = iter.next(); + if (pred.test(obj)) { + return obj; + } + } + return null; + } + } + + public static Object findFirst (IPredicate pred, Map map) { + if (map == null || pred == null) { + return null; + } else { + Iterator<Object> iter = map.entrySet().iterator(); + while(iter != null && iter.hasNext()) { + Object obj = iter.next(); + if (pred.test(obj)) { + return obj; + } + } + return null; + } + } + + public static String localHostname () throws UnknownHostException { + return _instance.localHostnameImpl(); + } + + // Non-static impl methods exist for mocking purposes. + protected String localHostnameImpl () throws UnknownHostException { + return InetAddress.getLocalHost().getCanonicalHostName(); + } + + private static String memoizedLocalHostnameString = null; + + public static String memoizedLocalHostname () throws UnknownHostException { + if (memoizedLocalHostnameString == null) { + memoizedLocalHostnameString = localHostname(); + } + return memoizedLocalHostnameString; + } + + /** + * Gets the storm.local.hostname value, or tries to figure out the local hostname + * if it is not set in the config. + * @param conf The storm config to read from + * @return a string representation of the hostname. + */ + public static String hostname (Map<String, Object> conf) throws UnknownHostException { + if (conf == null) { + return memoizedLocalHostname(); + } + Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME); + if (hostnameString == null ) { + return memoizedLocalHostname(); + } + if (hostnameString.equals("")) { + return memoizedLocalHostname(); + } + return hostnameString.toString(); + } + + public static String uuid() { + return UUID.randomUUID().toString(); + } + + public static long secsToMillisLong(double secs) { + return (long) (1000 * secs); + } + + public static Vector<String> tokenizePath (String path) { + String[] tokens = path.split("/"); + Vector<String> outputs = new Vector<String>(); + if (tokens == null || tokens.length == 0) { + return null; + } + for (String tok: tokens) { + if (!tok.isEmpty()) { + outputs.add(tok); + } + } + return outputs; + } + + public static String parentPath(String path) { + if (path == null) { + return "/"; + } + Vector<String> tokens = tokenizePath(path); + int length = tokens.size(); + if (length == 0) { + return "/"; + } + String output = ""; + for (int i = 0; i < length - 1; i++) { //length - 1 to mimic "butlast" from the old clojure code + output = output + "/" + tokens.get(i); + } + return output; + } + + public static String toksToPath (Vector<String> toks) { + if (toks == null || toks.size() == 0) { + return "/"; + } + + String output = ""; + for (int i = 0; i < toks.size(); i++) { + output = output + "/" + toks.get(i); + } + return output; + } + public static String normalizePath (String path) { + return toksToPath(tokenizePath(path)); + } + + public static void exitProcess (int val, Object... msg) { + StringBuilder errorMessage = new StringBuilder(); + errorMessage.append("halting process: "); + for (Object oneMessage: msg) { + errorMessage.append(oneMessage); + } + String combinedErrorMessage = errorMessage.toString(); + LOG.error(combinedErrorMessage, new RuntimeException(combinedErrorMessage)); + Runtime.getRuntime().exit(val); + } + + public static Object defaulted(Object val, Object defaultObj) { + if (val != null) { + return val; + } else { + return defaultObj; + } + } + + /** + * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}" + * + * Example usage in java: + * Map<Integer, String> tasks; + * Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks); + * + * @param map + * @return + */ + public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) { + HashMap<V, List<K>> rtn = new HashMap<V, List<K>>(); + if (map == null) { + return rtn; + } + for (Entry<K, V> entry : map.entrySet()) { + K key = entry.getKey(); + V val = entry.getValue(); + List<K> list = rtn.get(val); + if (list == null) { + list = new ArrayList<K>(); + rtn.put(entry.getValue(), list); + } + list.add(key); + } + return rtn; + } + + /** + * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}" + * + */ + public static HashMap reverseMap(List listSeq) { + HashMap<Object, List<Object>> rtn = new HashMap(); + if (listSeq == null) { + return rtn; + } + for (Object entry : listSeq) { + List listEntry = (List) entry; + Object key = listEntry.get(0); + Object val = listEntry.get(1); + List list = rtn.get(val); + if (list == null) { + list = new ArrayList<Object>(); + rtn.put(val, list); + } + list.add(key); + } + return rtn; + } + + + /** + * Gets the pid of this JVM, because Java doesn't provide a real way to do this. + * + * @return + */ + public static String processPid() throws RuntimeException { + String name = ManagementFactory.getRuntimeMXBean().getName(); + String[] split = name.split("@"); + if (split.length != 2) { + throw new RuntimeException("Got unexpected process name: " + name); + } + return split[0]; + } + + public static int execCommand(String command) throws ExecuteException, IOException { + String[] cmdlist = command.split(" "); + CommandLine cmd = new CommandLine(cmdlist[0]); + for (int i = 1; i < cmdlist.length; i++) { + cmd.addArgument(cmdlist[i]); + } + + DefaultExecutor exec = new DefaultExecutor(); + return exec.execute(cmd); + } + + /** + * Extra dir from the jar to destdir + * + * @param jarpath + * @param dir + * @param destdir + * + (with-open [jarpath (ZipFile. jarpath)] + (let [entries (enumeration-seq (.entries jarpath))] + (doseq [file (filter (fn [entry](and (not (.isDirectory entry)) (.startsWith (.getName entry) dir))) entries)] + (.mkdirs (.getParentFile (File. destdir (.getName file)))) + (with-open [out (FileOutputStream. (File. destdir (.getName file)))] + (io/copy (.getInputStream jarpath file) out))))) + + */ + public static void extractDirFromJar(String jarpath, String dir, String destdir) { + JarFile jarFile = null; + FileOutputStream out = null; + InputStream in = null; + try { + jarFile = new JarFile(jarpath); + Enumeration<JarEntry> jarEnums = jarFile.entries(); + while (jarEnums.hasMoreElements()) { + JarEntry entry = jarEnums.nextElement(); + if (!entry.isDirectory() && entry.getName().startsWith(dir)) { + File aFile = new File(destdir, entry.getName()); + aFile.getParentFile().mkdirs(); + out = new FileOutputStream(aFile); + in = jarFile.getInputStream(entry); + IOUtils.copy(in, out); + out.close(); + in.close(); + } + } + } catch (IOException e) { + LOG.info("Could not extract {} from {}", dir, jarpath); + } finally { + if (jarFile != null) { + try { + jarFile.close(); + } catch (IOException e) { + throw new RuntimeException( + "Something really strange happened when trying to close the jar file" + jarpath); + } + } + if (out != null) { + try { + out.close(); + } catch (IOException e) { + throw new RuntimeException( + "Something really strange happened when trying to close the output for jar file" + jarpath); + } + } + if (in != null) { + try { + in.close(); + } catch (IOException e) { + throw new RuntimeException( + "Something really strange happened when trying to close the input for jar file" + jarpath); + } + } + } + + } + + public static int sendSignalToProcess(long pid, int signum) { + int retval = 0; + try { + String killString = null; + if (isOnWindows()) { + if (signum == SIGKILL) { + killString = "taskkill /f /pid "; + } else { + killString = "taskkill /pid "; + } + } else { + killString = "kill -" + signum + " "; + } + killString = killString + pid; + retval = execCommand(killString); + } catch (ExecuteException e) { + LOG.info("Error when trying to kill " + pid + ". Process is probably already dead."); + } catch (IOException e) { + LOG.info("IOException Error when trying to kill " + pid + "."); + } finally { + return retval; + } + } + + public static int forceKillProcess (long pid) { + return sendSignalToProcess(pid, SIGKILL); + } + + public static int forceKillProcess (String pid) { + return sendSignalToProcess(Long.parseLong(pid), SIGKILL); + } + + public static int killProcessWithSigTerm (long pid) { + return sendSignalToProcess(pid, SIGTERM); + } + public static int killProcessWithSigTerm (String pid) { + return sendSignalToProcess(Long.parseLong(pid), SIGTERM); + } + + /* + Adds the user supplied function as a shutdown hook for cleanup. + Also adds a function that sleeps for a second and then sends kill -9 + to process to avoid any zombie process in case cleanup function hangs. + */ + public static void addShutdownHookWithForceKillIn1Sec (Runnable func) { + Runnable sleepKill = new Runnable() { + @Override + public void run() { + try { + Time.sleepSecs(1); + Runtime.getRuntime().halt(20); + } catch (Exception e) { + LOG.warn("Exception in the ShutDownHook: " + e); + } + } + }; + Runtime.getRuntime().addShutdownHook(new Thread(func)); + Runtime.getRuntime().addShutdownHook(new Thread(sleepKill)); + } + + /** + * Returns the combined string, escaped for posix shell. + * @param command the list of strings to be combined + * @return the resulting command string + */ + public static String shellCmd (List<String> command) { + List<String> changedCommands = new LinkedList<>(); + for (String str: command) { + if (str == null) { + continue; + } + changedCommands.add("'" + str.replaceAll("'", "'\"'\"'") + "'"); + } + return StringUtils.join(changedCommands, " "); + } + + public static String scriptFilePath (String dir) { + return dir + FILE_PATH_SEPARATOR + "storm-worker-script.sh"; + } + + public static String containerFilePath (String dir) { + return dir + FILE_PATH_SEPARATOR + "launch_container.sh"; + } + + public static void throwRuntime (Object... strings) { + String combinedErrorMessage = ""; + for (Object oneMessage: strings) { + combinedErrorMessage = combinedErrorMessage + oneMessage.toString(); + } + throw new RuntimeException(combinedErrorMessage); --- End diff -- String concatenation is broken here if a nil/null is passed in. Can at least switch to using StringBuilder, and I would really prefer to just see this inlined everywhere. Is it really that much simpler to do ``` (Utils/throwRuntime [a b c]) ``` over ``` (throw (RuntimeException. (str a b c)) ``` > Port backtype.storm.util to java > -------------------------------- > > Key: STORM-1226 > URL: https://issues.apache.org/jira/browse/STORM-1226 > Project: Apache Storm > Issue Type: New Feature > Components: storm-core > Reporter: Robert Joseph Evans > Assignee: Reza Farivar > Labels: java-migration, jstorm-merger > > Port backtype.storm.util from clojure to java. In as many instances as > possible the same interface should be maintained, and calls to clojure > functions in the rest of the code should be replaces with calls to the > corresponding java code. > Some similar functions can be found at > https://github.com/apache/storm/blob/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java > Although they are not identical. > For function callbacks we may need to evaluate adding in appropriate callback > interfaces instead. Please try to avoid using clojure internal java classes > unless necessary. -- This message was sent by Atlassian JIRA (v6.3.4#6332)