Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/1074#discussion_r52499915
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -1370,9 +1444,820 @@ public static int toPositive(int number) {
public static RuntimeException wrapInRuntime(Exception e){
if (e instanceof RuntimeException){
return (RuntimeException)e;
- }else {
+ } else {
return new RuntimeException(e);
}
}
-}
+ /**
+ * Determines if a zip archive contains a particular directory.
+ *
+ * @param zipfile path to the zipped file
+ * @param target directory being looked for in the zip.
+ * @return boolean whether or not the directory exists in the zip.
+ */
+ public static boolean zipDoesContainDir(String zipfile, String target)
throws IOException {
+ List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new
ZipFile(zipfile).entries());
+
+ String targetDir = target + "/";
+ for(ZipEntry entry : entries) {
+ String name = entry.getName();
+ if(name.startsWith(targetDir)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Joins any number of maps together into a single map, combining
their values into
+ * a list, maintaining values in the order the maps were passed in.
Nulls are inserted
+ * for given keys when the map does not contain that key.
+ *
+ * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' =>
5}) ->
+ * {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null,
5]}
+ *
+ * @param maps variable number of maps to join - order affects order
of values in output.
+ * @return combined map
+ */
+ public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) {
+ Map<K, List<V>> ret = new HashMap<>();
+
+ Set<K> keys = new HashSet<>();
+
+ for(Map<K, V> map : maps) {
+ keys.addAll(map.keySet());
+ }
+
+ for(Map<K, V> m : maps) {
+ for(K key : keys) {
+ V value = m.get(key);
+
+ if(!ret.containsKey(key)) {
+ ret.put(key, new ArrayList<V>());
+ }
+
+ List<V> targetList = ret.get(key);
+ targetList.add(value);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Fills up chunks out of a collection (given a maximum amount of
chunks)
+ *
+ * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]]
+ * partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]]
+ * partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
+ * @param maxNumChunks the maximum number of chunks to return
+ * @param coll the collection to be chunked up
+ * @return a list of the chunks, which are themselves lists.
+ */
+ public static <T> List<List<T>> partitionFixed(int maxNumChunks,
Collection<T> coll) {
+ List<List<T>> ret = new ArrayList<>();
+
+ if(maxNumChunks == 0 || coll == null) {
+ return ret;
+ }
+
+ Map<Integer, Integer> parts = integerDivided(coll.size(),
maxNumChunks);
+
+ // Keys sorted in descending order
+ List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet());
+ Collections.sort(sortedKeys, Collections.reverseOrder());
+
+
+ Iterator<T> it = coll.iterator();
+ for(Integer chunkSize : sortedKeys) {
+ if(!it.hasNext()) { break; }
+ Integer times = parts.get(chunkSize);
+ for(int i = 0; i < times; i++) {
+ if(!it.hasNext()) { break; }
+ List<T> chunkList = new ArrayList<>();
+ for(int j = 0; j < chunkSize; j++) {
+ if(!it.hasNext()) { break; }
+ chunkList.add(it.next());
+ }
+ ret.add(chunkList);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Return a new instance of a pluggable specified in the conf.
+ * @param conf The conf to read from.
+ * @param configKey The key pointing to the pluggable class
+ * @return an instance of the class or null if it is not specified.
+ */
+ public static Object getConfiguredClass(Map conf, Object configKey) {
+ if (conf.containsKey(configKey)) {
+ return newInstance((String)conf.get(configKey));
+ }
+ return null;
+ }
+
+ public static String logsFilename(String stormId, int port) {
+ return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR
+ "worker.log";
+ }
+
+ public static String eventLogsFilename(String stormId, int port) {
+ return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR
+ "events.log";
+ }
+
+ public static Object readYamlFile(String yamlFile) {
+ try (FileReader reader = new FileReader(yamlFile)) {
+ return new Yaml(new SafeConstructor()).load(reader);
+ } catch(Exception ex) {
+ LOG.error("Failed to read yaml file.", ex);
+ }
+ return null;
+ }
+
+ public static void setupDefaultUncaughtExceptionHandler() {
+ Thread.setDefaultUncaughtExceptionHandler(new
Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread thread, Throwable
thrown) {
+ try {
+ handleUncaughtException(thrown);
+ } catch (Error err) {
+ LOG.error("Received error in main thread..
terminating server...", err);
+ Runtime.getRuntime().exit(-2);
+ }
+ }
+ });
+ }
+
+ /**
+ * Creates a new map with a string value in the map replaced with an
+ * equivalently-lengthed string of '#'.
+ * @param m The map that a value will be redacted from
+ * @param key The key pointing to the value to be redacted
+ * @return a new map with the value redacted. The original map will
not be modified.
+ */
+ public static Map<Object, String> redactValue(Map<Object, String> m,
Object key) {
+ if(m.containsKey(key)) {
+ HashMap<Object, String> newMap = new HashMap<>(m);
+ String value = newMap.get(key);
+ String redacted = new String(new
char[value.length()]).replace("\0", "#");
+ newMap.put(key, redacted);
+ return newMap;
+ }
+ return m;
+ }
+
+ /**
+ * Make sure a given key name is valid for the storm config.
+ * Throw RuntimeException if the key isn't valid.
+ * @param name The name of the config key to check.
+ */
+ private static final Set<String> disallowedKeys = new
HashSet<>(Arrays.asList(new String[] {"/", ".", ":", "\\"}));
+ public static void validateKeyName(String name) {
+
+ for(String key : disallowedKeys) {
+ if( name.contains(key) ) {
+ throw new RuntimeException("Key name cannot contain any of
the following: " + disallowedKeys.toString());
+ }
+ }
+ if(name.trim().isEmpty()) {
+ throw new RuntimeException("Key name cannot be blank");
+ }
+ }
+
+ /**
+ * Find the first item of coll for which pred.test(...) returns true.
+ * @param pred The IPredicate to test for
+ * @param coll The Collection of items to search through.
+ * @return The first matching value in coll, or null if nothing
matches.
+ */
+ public static <T> T findFirst (IPredicate<T> pred, Collection<T> coll)
{
+ if(coll == null) {
+ return null;
+ }
+ for(T elem : coll) {
+ if (pred.test(elem)) {
+ return elem;
+ }
+ }
+ return null;
+ }
+
+ public static <T, U> T findFirst (IPredicate<T> pred, Map<U, T> map) {
+ if(map == null) {
+ return null;
+ }
+ return findFirst(pred, (Set<T>)map.entrySet());
+ }
+
+ public static String localHostname () throws UnknownHostException {
+ return _instance.localHostnameImpl();
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ protected String localHostnameImpl () throws UnknownHostException {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ }
+
+ private static String memoizedLocalHostnameString = null;
+
+ public static String memoizedLocalHostname () throws
UnknownHostException {
+ if (memoizedLocalHostnameString == null) {
+ memoizedLocalHostnameString = localHostname();
+ }
+ return memoizedLocalHostnameString;
+ }
+
+ /**
+ * Gets the storm.local.hostname value, or tries to figure out the
local hostname
+ * if it is not set in the config.
+ * @param conf The storm config to read from
+ * @return a string representation of the hostname.
+ */
+ public static String hostname (Map<String, Object> conf) throws
UnknownHostException {
+ if (conf == null) {
+ return memoizedLocalHostname();
+ }
+ Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
+ if (hostnameString == null || hostnameString.equals("")) {
+ return memoizedLocalHostname();
+ }
+ return (String)hostnameString;
+ }
+
+ public static String uuid() {
+ return UUID.randomUUID().toString();
+ }
+
+ public static void exitProcess (int val, Object... msg) {
+ StringBuilder errorMessage = new StringBuilder();
+ errorMessage.append("Halting process: ");
+ for (Object oneMessage: msg) {
+ errorMessage.append(oneMessage);
+ }
+ String combinedErrorMessage = errorMessage.toString();
+ LOG.error(combinedErrorMessage, new
RuntimeException(combinedErrorMessage));
+ Runtime.getRuntime().exit(val);
+ }
+
+ /**
+ * "{:a 1 :b 2} -> {1 :a 2 :b}"
+ *
+ * Note: Only one key wins if there are duplicate values.
+ * Which key wins is indeterminate:
+ * "{:a 1 :b 1} -> {1 :a} *or* {1 :b}"
+ */
+ public static <K, V> Map<V, K> simpleReverseMap(Map<K, V> map) {
+ Map<V, K> ret = new HashMap<V, K>();
+ for (Map.Entry<K, V> entry : map.entrySet()) {
+ ret.put(entry.getValue(), entry.getKey());
+ }
+ return ret;
+ }
+
+ /**
+ * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+ *
+ * Example usage in java:
+ * Map<Integer, String> tasks;
+ * Map<String, List<Integer>> componentTasks =
Utils.reverse_map(tasks);
+ *
+ * @param map
+ * @return
+ */
+ public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
+ HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
+ if (map == null) {
+ return rtn;
+ }
+ for (Entry<K, V> entry : map.entrySet()) {
+ K key = entry.getKey();
+ V val = entry.getValue();
+ List<K> list = rtn.get(val);
+ if (list == null) {
+ list = new ArrayList<K>();
+ rtn.put(entry.getValue(), list);
+ }
+ list.add(key);
+ }
+ return rtn;
+ }
+
+ /**
+ * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+ *
+ */
+ public static HashMap reverseMap(List listSeq) {
+ HashMap<Object, List<Object>> rtn = new HashMap();
+ if (listSeq == null) {
+ return rtn;
+ }
+ for (Object entry : listSeq) {
+ List listEntry = (List) entry;
+ Object key = listEntry.get(0);
+ Object val = listEntry.get(1);
+ List list = rtn.get(val);
+ if (list == null) {
+ list = new ArrayList<Object>();
+ rtn.put(val, list);
+ }
+ list.add(key);
+ }
+ return rtn;
+ }
+
+
+ /**
+ * @return the pid of this JVM, because Java doesn't provide a real
way to do this.
+ */
+ public static String processPid() {
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ String[] split = name.split("@");
+ if (split.length != 2) {
+ throw new RuntimeException("Got unexpected process name: " +
name);
+ }
+ return split[0];
+ }
+
+ public static int execCommand(String... command) throws
ExecuteException, IOException {
+ CommandLine cmd = new CommandLine(command[0]);
+ for (int i = 1; i < command.length; i++) {
+ cmd.addArgument(command[i]);
+ }
+
+ DefaultExecutor exec = new DefaultExecutor();
+ return exec.execute(cmd);
+ }
+
+ /**
+ * Extract dir from the jar to destdir
+ *
+ * @param jarpath Path to the jar file
+ * @param dir Directory in the jar to pull out
+ * @param destdir Path to the directory where the extracted directory
will be put
+ *
+ */
+ public static void extractDirFromJar(String jarpath, String dir,
String destdir) {
+ try (JarFile jarFile = new JarFile(jarpath)) {
+ Enumeration<JarEntry> jarEnums = jarFile.entries();
+ while (jarEnums.hasMoreElements()) {
+ JarEntry entry = jarEnums.nextElement();
+ if (!entry.isDirectory() &&
entry.getName().startsWith(dir)) {
+ File aFile = new File(destdir, entry.getName());
+ aFile.getParentFile().mkdirs();
+ try (FileOutputStream out = new
FileOutputStream(aFile);
+ InputStream in = jarFile.getInputStream(entry)) {
+ IOUtils.copy(in, out);
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.info("Could not extract {} from {}", dir, jarpath);
+ }
+ }
+
+ public static void sendSignalToProcess(long lpid, int signum) throws
IOException {
+ String pid = Long.toString(lpid);
+ try {
+ if (isOnWindows()) {
+ if (signum == SIGKILL) {
+ execCommand("taskkill", "/f", "/pid", pid);
+ } else {
+ execCommand("taskkill", "/pid", pid);
+ }
+ } else {
+ execCommand("kill", "-" + signum, pid);
+ }
+ } catch (ExecuteException e) {
+ LOG.info("Error when trying to kill " + pid + ". Process is
probably already dead.");
+ } catch (IOException e) {
+ LOG.info("IOException Error when trying to kill " + pid + ".");
+ throw e;
+ }
+ }
+
+ public static void forceKillProcess (String pid) throws IOException {
+ sendSignalToProcess(Long.parseLong(pid), SIGKILL);
+ }
+
+ public static void killProcessWithSigTerm (String pid) throws
IOException {
+ sendSignalToProcess(Long.parseLong(pid), SIGTERM);
+ }
+
+ /**
+ * Adds the user supplied function as a shutdown hook for cleanup.
+ * Also adds a function that sleeps for a second and then halts the
+ * runtime to avoid any zombie process in case cleanup function hangs.
+ */
+ public static void addShutdownHookWithForceKillIn1Sec (Runnable func) {
+ Runnable sleepKill = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Time.sleepSecs(1);
+ Runtime.getRuntime().halt(20);
+ } catch (Exception e) {
+ LOG.warn("Exception in the ShutDownHook", e);
+ }
+ }
+ };
+ Runtime.getRuntime().addShutdownHook(new Thread(func));
+ Runtime.getRuntime().addShutdownHook(new Thread(sleepKill));
+ }
+
+ /**
+ * Returns the combined string, escaped for posix shell.
+ * @param command the list of strings to be combined
+ * @return the resulting command string
+ */
+ public static String shellCmd (List<String> command) {
+ List<String> changedCommands = new ArrayList<>(command.size());
+ for (String str: command) {
+ if (str == null) {
+ continue;
+ }
+ changedCommands.add("'" + str.replaceAll("'", "'\"'\"'") +
"'");
+ }
+ return StringUtils.join(changedCommands, " ");
+ }
+
+ public static String scriptFilePath (String dir) {
+ return dir + FILE_PATH_SEPARATOR + "storm-worker-script.sh";
+ }
+
+ public static String containerFilePath (String dir) {
+ return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
+ }
+
+ public static Object nullToZero (Object v) {
+ return (v != null ? v : 0);
+ }
+
+ /**
+ * Deletes a file or directory and its contents if it exists. Does not
+ * complain if the input is null or does not exist.
+ * @param path the path to the file or directory
+ */
+ public static void forceDelete(String path) throws IOException {
+ _instance.forceDeleteImpl(path);
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ protected void forceDeleteImpl(String path) throws IOException {
+ LOG.debug("Deleting path {}", path);
+ if (checkFileExists(path)) {
+ try {
+ FileUtils.forceDelete(new File(path));
+ } catch (FileNotFoundException ignored) {}
+ }
+ }
+
+ /**
+ * Creates a symbolic link to the target
+ * @param dir the parent directory of the link
+ * @param targetDir the parent directory of the link's target
+ * @param targetFilename the file name of the links target
+ * @param filename the file name of the link
+ * @return the path of the link if it did not exist, otherwise null
--- End diff --
recently this was changed not to return anything
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---