[ 
https://issues.apache.org/jira/browse/STORM-1226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15137456#comment-15137456
 ] 

ASF GitHub Bot commented on STORM-1226:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1074#discussion_r52207624
  
    --- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -1370,9 +1469,974 @@ public static int toPositive(int number) {
         public static RuntimeException wrapInRuntime(Exception e){
             if (e instanceof RuntimeException){
                 return (RuntimeException)e;
    -        }else {
    +        } else {
                 return new RuntimeException(e);
             }
         }
    -}
     
    +    /**
    +     * Determines if a zip archive contains a particular directory.
    +     *
    +     * @param zipfile path to the zipped file
    +     * @param target directory being looked for in the zip.
    +     * @return boolean whether or not the directory exists in the zip.
    +     */
    +    public static boolean zipDoesContainDir(String zipfile, String target) 
throws IOException {
    +        List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new 
ZipFile(zipfile).entries());
    +
    +        if(entries == null) {
    +            return false;
    +        }
    +
    +        String targetDir = target + "/";
    +        for(ZipEntry entry : entries) {
    +            String name = entry.getName();
    +            if(name.startsWith(targetDir)) {
    +                return true;
    +            }
    +        }
    +
    +        return false;
    +    }
    +
    +    /**
    +     * Joins any number of maps together into a single map, combining 
their values into
    +     * a list, maintaining values in the order the maps were passed in. 
Nulls are inserted
    +     * for given keys when the map does not contain that key.
    +     *
    +     * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' => 
5}) ->
    +     *      {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null, 
5]}
    +     *
    +     * @param maps variable number of maps to join - order affects order 
of values in output.
    +     * @return combined map
    +     */
    +    public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) {
    +        Map<K, List<V>> ret = new HashMap<>();
    +
    +        Set<K> keys = new HashSet<>();
    +
    +        for(Map<K, V> map : maps) {
    +            keys.addAll(map.keySet());
    +        }
    +
    +        for(Map<K, V> m : maps) {
    +            for(K key : keys) {
    +                V value = m.get(key);
    +
    +                if(!ret.containsKey(key)) {
    +                    ret.put(key, new ArrayList<V>());
    +                }
    +
    +                List<V> targetList = ret.get(key);
    +                targetList.add(value);
    +            }
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Fills up chunks out of a collection (given a maximum amount of 
chunks)
    +     *
    +     * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]]
    +     *      partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]]
    +     *      partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
    +     * @param maxNumChunks the maximum number of chunks to return
    +     * @param coll the collection to be chunked up
    +     * @return a list of the chunks, which are themselves lists.
    +     */
    +    public static <T> List<List<T>> partitionFixed(int maxNumChunks, 
Collection<T> coll) {
    +        List<List<T>> ret = new ArrayList<>();
    +
    +        if(maxNumChunks == 0 || coll == null) {
    +            return ret;
    +        }
    +
    +        Map<Integer, Integer> parts = integerDivided(coll.size(), 
maxNumChunks);
    +
    +        // Keys sorted in descending order
    +        List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet());
    +        Collections.sort(sortedKeys, Collections.reverseOrder());
    +
    +
    +        Iterator<T> it = coll.iterator();
    +        for(Integer chunkSize : sortedKeys) {
    +            if(!it.hasNext()) { break; }
    +            Integer times = parts.get(chunkSize);
    +            for(int i = 0; i < times; i++) {
    +                if(!it.hasNext()) { break; }
    +                List<T> chunkList = new ArrayList<>();
    +                for(int j = 0; j < chunkSize; j++) {
    +                    if(!it.hasNext()) { break; }
    +                    chunkList.add(it.next());
    +                }
    +                ret.add(chunkList);
    +            }
    +        }
    +
    +        return ret;
    +    }
    +
    +    /**
    +     * Return a new instance of a pluggable specified in the conf.
    +     * @param conf The conf to read from.
    +     * @param configKey The key pointing to the pluggable class
    +     * @return an instance of the class or null if it is not specified.
    +     */
    +    public static Object getConfiguredClass(Map conf, Object configKey) {
    +        if (conf.containsKey(configKey)) {
    +            return newInstance((String)conf.get(configKey));
    +        }
    +        return null;
    +    }
    +
    +    public static String logsFilename(String stormId, int port) {
    +        return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) + 
FILE_PATH_SEPARATOR + "worker.log";
    +    }
    +
    +    public static String eventLogsFilename(String stormId, int port) {
    +        return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) + 
FILE_PATH_SEPARATOR + "events.log";
    +    }
    +
    +    public static Object readYamlFile(String yamlFile) {
    +        try (FileReader reader = new FileReader(yamlFile)) {
    +            return new Yaml(new SafeConstructor()).load(reader);
    +        }
    +        catch(Exception ex) {
    +            LOG.error("Failed to read yaml file.", ex);
    +        }
    +        return null;
    +    }
    +
    +    public static void setupDefaultUncaughtExceptionHandler() {
    +        Thread.setDefaultUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
    +                public void uncaughtException(Thread thread, Throwable 
thrown) {
    +                    try {
    +                        handleUncaughtException(thrown);
    +                    }
    +                    catch (Error err) {
    +                        LOG.error("Received error in main thread.. 
terminating server...", err);
    +                        Runtime.getRuntime().exit(-2);
    +                    }
    +                }
    +            });
    +    }
    +
    +    /**
    +     * Creates a new map with a string value in the map replaced with an 
    +     * equivalently-lengthed string of '#'.
    +     * @param m The map that a value will be redacted from
    +     * @param key The key pointing to the value to be redacted
    +     * @return a new map with the value redacted. The original map will 
not be modified.
    +     */
    +    public static Map redactValue(Map<Object, String> m, Object key) { 
    +        if(m.containsKey(key)) {
    +            HashMap<Object, String> newMap = new HashMap<>(m);
    +            String value = newMap.get(key);
    +            String redacted = new String(new 
char[value.length()]).replace("\0", "#");
    +            newMap.put(key, redacted);
    +            return newMap;
    +        }
    +        return m;
    +    }
    +
    +    public static void logThriftAccess(Integer requestId, InetAddress 
remoteAddress, Principal principal, String operation) {
    +        new ThriftAccessLogger().log(
    +            String.format("Request ID: {} access from: {} principal: {} 
operation: {}",
    +                          requestId, remoteAddress, principal, operation));
    +    }
    +
    +    /**
    +     * Make sure a given key name is valid for the storm config. 
    +     * Throw RuntimeException if the key isn't valid.
    +     * @param name The name of the config key to check.
    +     */
    +    public static void validateKeyName(String name) {
    +        Set<String> disallowedKeys = new HashSet<>();
    +        disallowedKeys.add("/");
    +        disallowedKeys.add(".");
    +        disallowedKeys.add(":");
    +        disallowedKeys.add("\\");
    +
    +        for(String key : disallowedKeys) {
    +            if( name.contains(key) ) {
    +                throw new RuntimeException("Key name cannot contain any of 
the following: " + disallowedKeys.toString());
    +            }
    +        }
    +        if(name.trim().isEmpty()) {
    +            throw new RuntimeException("Key name cannot be blank");
    +        }
    +    }
    +
    +    //Everything from here on is translated from the old util.clj 
(storm-core/src/clj/backtype.storm/util.clj)
    +
    +    public static final boolean IS_ON_WINDOWS = 
"Windows_NT".equals(System.getenv("OS"));
    +
    +    public static final String FILE_PATH_SEPARATOR = 
System.getProperty("file.separator");
    +
    +    public static final String CLASS_PATH_SEPARATOR = 
System.getProperty("path.separator");
    +
    +    public static final int SIGKILL = 9;
    +    public static final int SIGTERM = 15;
    +
    +
    +
    +    /**
    +     * Find the first item of coll for which pred.test(...) returns true.
    +     * @param pred The IPredicate to test for
    +     * @param coll The Collection of items to search through.
    +     * @return The first matching value in coll, or null if nothing 
matches.
    +     */
    +    public static Object findFirst (IPredicate pred, Collection coll) {
    +        if (coll == null || pred == null) {
    +            return null;
    +        } else {
    +            Iterator<Object> iter = coll.iterator();
    +            while(iter != null && iter.hasNext()) {
    +                Object obj = iter.next();
    +                if (pred.test(obj)) {
    +                    return obj;
    +                }
    +            }
    +            return null;
    +        }
    +    }
    +
    +    public static Object findFirst (IPredicate pred, Map map) {
    +        if (map == null || pred == null) {
    +            return null;
    +        } else {
    +            Iterator<Object> iter = map.entrySet().iterator();
    +            while(iter != null && iter.hasNext()) {
    +                Object obj = iter.next();
    +                if (pred.test(obj)) {
    +                    return obj;
    +                }
    +            }
    +            return null;
    +        }
    +    }
    +
    +    public static String localHostname () throws UnknownHostException {
    +        return _instance.localHostnameImpl();
    +    }
    +
    +    // Non-static impl methods exist for mocking purposes.
    +    protected String localHostnameImpl () throws UnknownHostException {
    +        return InetAddress.getLocalHost().getCanonicalHostName();
    +    }
    +
    +    private static String memoizedLocalHostnameString = null;
    +
    +    public static String memoizedLocalHostname () throws 
UnknownHostException {
    +        if (memoizedLocalHostnameString == null) {
    +            memoizedLocalHostnameString = localHostname();
    +        }
    +        return memoizedLocalHostnameString;
    +    }
    +
    +    /**
    +     * Gets the storm.local.hostname value, or tries to figure out the 
local hostname
    +     * if it is not set in the config.
    +     * @param conf The storm config to read from
    +     * @return a string representation of the hostname.
    +    */
    +    public static String hostname (Map<String, Object> conf) throws 
UnknownHostException  {
    +        if (conf == null) {
    +            return memoizedLocalHostname();
    +        }
    +        Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
    +        if (hostnameString == null ) {
    +            return memoizedLocalHostname();
    +        }
    +        if (hostnameString.equals("")) {
    +            return memoizedLocalHostname();
    +        }
    +        return hostnameString.toString();
    +    }
    +
    +    public static String uuid() {
    +        return UUID.randomUUID().toString();
    +    }
    +
    +    public static long secsToMillisLong(double secs) {
    +        return (long) (1000 * secs);
    +    }
    +
    +    public static Vector<String> tokenizePath (String path) {
    +        String[] tokens = path.split("/");
    +        Vector<String> outputs = new Vector<String>();
    +        if (tokens == null || tokens.length == 0) {
    +            return null;
    +        }
    +        for (String tok: tokens) {
    +            if (!tok.isEmpty()) {
    +                outputs.add(tok);
    +            }
    +        }
    +        return outputs;
    +    }
    +
    +    public static String parentPath(String path) {
    +        if (path == null) {
    +            return "/";
    +        }
    +        Vector<String> tokens = tokenizePath(path);
    +        int length = tokens.size();
    +        if (length == 0) {
    +            return "/";
    +        }
    +        String output = "";
    +        for (int i = 0; i < length - 1; i++) {  //length - 1 to mimic 
"butlast" from the old clojure code
    +            output = output + "/" + tokens.get(i);
    +        }
    +        return output;
    +    }
    +
    +    public static String toksToPath (Vector<String> toks) {
    +        if (toks == null || toks.size() == 0) {
    +            return "/";
    +        }
    +
    +        String output = "";
    +        for (int i = 0; i < toks.size(); i++) {
    +            output = output + "/" + toks.get(i);
    +        }
    +        return output;
    +    }
    +    public static String normalizePath (String path) {
    +        return toksToPath(tokenizePath(path));
    +    }
    +
    +    public static void exitProcess (int val, Object... msg) {
    +        StringBuilder errorMessage = new StringBuilder();
    +        errorMessage.append("halting process: ");
    +        for (Object oneMessage: msg) {
    +            errorMessage.append(oneMessage);
    +        }
    +        String combinedErrorMessage = errorMessage.toString();
    +        LOG.error(combinedErrorMessage, new 
RuntimeException(combinedErrorMessage));
    +        Runtime.getRuntime().exit(val);
    +    }
    +
    +    public static Object defaulted(Object val, Object defaultObj) {
    +        if (val != null) {
    +            return val;
    +        } else {
    +            return defaultObj;
    +        }
    +    }
    +
    +    /**
    +     * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
    +     *
    +     * Example usage in java:
    +     *  Map<Integer, String> tasks;
    +     *  Map<String, List<Integer>> componentTasks = 
Utils.reverse_map(tasks);
    +     *
    +     * @param map
    +     * @return
    +     */
    +    public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
    +        HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
    +        if (map == null) {
    +            return rtn;
    +        }
    +        for (Entry<K, V> entry : map.entrySet()) {
    +            K key = entry.getKey();
    +            V val = entry.getValue();
    +            List<K> list = rtn.get(val);
    +            if (list == null) {
    +                list = new ArrayList<K>();
    +                rtn.put(entry.getValue(), list);
    +            }
    +            list.add(key);
    +        }
    +        return rtn;
    +    }
    +
    +    /**
    +     * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
    +     *
    +     */
    +    public static HashMap reverseMap(List listSeq) {
    +        HashMap<Object, List<Object>> rtn = new HashMap();
    +        if (listSeq == null) {
    +            return rtn;
    +        }
    +        for (Object entry : listSeq) {
    +            List listEntry = (List) entry;
    +            Object key = listEntry.get(0);
    +            Object val = listEntry.get(1);
    +            List list = rtn.get(val);
    +            if (list == null) {
    +                list = new ArrayList<Object>();
    +                rtn.put(val, list);
    +            }
    +            list.add(key);
    +        }
    +        return rtn;
    +    }
    +
    +
    +    /**
    +     * Gets the pid of this JVM, because Java doesn't provide a real way 
to do this.
    +     *
    +     * @return
    +     */
    +    public static String processPid() throws RuntimeException {
    +        String name = ManagementFactory.getRuntimeMXBean().getName();
    +        String[] split = name.split("@");
    +        if (split.length != 2) {
    +            throw new RuntimeException("Got unexpected process name: " + 
name);
    +        }
    +        return split[0];
    +    }
    +
    +    public static int execCommand(String command) throws ExecuteException, 
IOException {
    +        String[] cmdlist = command.split(" ");
    +        CommandLine cmd = new CommandLine(cmdlist[0]);
    +        for (int i = 1; i < cmdlist.length; i++) {
    +            cmd.addArgument(cmdlist[i]);
    +        }
    +
    +        DefaultExecutor exec = new DefaultExecutor();
    +        return exec.execute(cmd);
    +    }
    +
    +    /**
    +     * Extra dir from the jar to destdir
    +     *
    +     * @param jarpath
    +     * @param dir
    +     * @param destdir
    +     *
    +    (with-open [jarpath (ZipFile. jarpath)]
    +    (let [entries (enumeration-seq (.entries jarpath))]
    +    (doseq [file (filter (fn [entry](and (not (.isDirectory entry)) 
(.startsWith (.getName entry) dir))) entries)]
    +    (.mkdirs (.getParentFile (File. destdir (.getName file))))
    +    (with-open [out (FileOutputStream. (File. destdir (.getName file)))]
    +    (io/copy (.getInputStream jarpath file) out)))))
    +
    +     */
    +    public static void extractDirFromJar(String jarpath, String dir, 
String destdir) {
    +        JarFile jarFile = null;
    +        FileOutputStream out = null;
    +        InputStream in = null;
    +        try {
    +            jarFile = new JarFile(jarpath);
    +            Enumeration<JarEntry> jarEnums = jarFile.entries();
    +            while (jarEnums.hasMoreElements()) {
    +                JarEntry entry = jarEnums.nextElement();
    +                if (!entry.isDirectory() && 
entry.getName().startsWith(dir)) {
    +                    File aFile = new File(destdir, entry.getName());
    +                    aFile.getParentFile().mkdirs();
    +                    out = new FileOutputStream(aFile);
    +                    in = jarFile.getInputStream(entry);
    +                    IOUtils.copy(in, out);
    +                    out.close();
    +                    in.close();
    +                }
    +            }
    +        } catch (IOException e) {
    +            LOG.info("Could not extract {} from {}", dir, jarpath);
    +        } finally {
    +            if (jarFile != null) {
    +                try {
    +                    jarFile.close();
    +                } catch (IOException e) {
    +                    throw new RuntimeException(
    +                            "Something really strange happened when trying 
to close the jar file" + jarpath);
    +                }
    +            }
    +            if (out != null) {
    +                try {
    +                    out.close();
    +                } catch (IOException e) {
    +                    throw new RuntimeException(
    +                            "Something really strange happened when trying 
to close the output for jar file" + jarpath);
    +                }
    +            }
    +            if (in != null) {
    +                try {
    +                    in.close();
    +                } catch (IOException e) {
    +                    throw new RuntimeException(
    +                            "Something really strange happened when trying 
to close the input for jar file" + jarpath);
    +                }
    +            }
    +        }
    +
    +    }
    +
    +    public static int sendSignalToProcess(long pid, int signum) {
    +        int retval = 0;
    +        try {
    +            String killString = null;
    +            if (isOnWindows()) {
    +                if (signum == SIGKILL) {
    +                    killString = "taskkill /f /pid ";
    +                } else {
    +                    killString = "taskkill /pid ";
    +                }
    +            } else {
    +                killString = "kill -" + signum + " ";
    +            }
    +            killString = killString + pid;
    +            retval = execCommand(killString);
    +        } catch (ExecuteException e) {
    +            LOG.info("Error when trying to kill " + pid + ". Process is 
probably already dead.");
    +        } catch (IOException e) {
    +            LOG.info("IOException Error when trying to kill " + pid + ".");
    +        } finally {
    +            return retval;
    +        }
    +    }
    +
    +    public static int forceKillProcess (long pid) {
    +        return sendSignalToProcess(pid, SIGKILL);
    +    }
    +
    +    public static int forceKillProcess (String pid) {
    +        return sendSignalToProcess(Long.parseLong(pid), SIGKILL);
    +    }
    +
    +    public static int killProcessWithSigTerm (long pid) {
    +        return sendSignalToProcess(pid, SIGTERM);
    +    }
    +    public static int killProcessWithSigTerm (String pid) {
    +        return sendSignalToProcess(Long.parseLong(pid), SIGTERM);
    +    }
    +
    +    /*
    +        Adds the user supplied function as a shutdown hook for cleanup.
    +        Also adds a function that sleeps for a second and then sends kill 
-9
    +        to process to avoid any zombie process in case cleanup function 
hangs.
    +     */
    +    public static void addShutdownHookWithForceKillIn1Sec (Runnable func) {
    +        Runnable sleepKill = new Runnable() {
    +            @Override
    +            public void run() {
    +                try {
    +                    Time.sleepSecs(1);
    +                    Runtime.getRuntime().halt(20);
    +                } catch (Exception e) {
    +                    LOG.warn("Exception in the ShutDownHook: " + e);
    +                }
    +            }
    +        };
    +        Runtime.getRuntime().addShutdownHook(new Thread(func));
    +        Runtime.getRuntime().addShutdownHook(new Thread(sleepKill));
    +    }
    +
    +    /**
    +     * Returns the combined string, escaped for posix shell.
    +     * @param command the list of strings to be combined
    +     * @return the resulting command string
    +     */
    +    public static String shellCmd (List<String> command) {
    +        List<String> changedCommands = new LinkedList<>();
    +        for (String str: command) {
    +            if (str == null) {
    +                continue;
    +            }
    +            changedCommands.add("'" + str.replaceAll("'", "'\"'\"'") + 
"'");
    +        }
    +        return StringUtils.join(changedCommands, " ");
    +    }
    +
    +    public static String scriptFilePath (String dir) {
    +        return dir + FILE_PATH_SEPARATOR + "storm-worker-script.sh";
    +    }
    +
    +    public static String containerFilePath (String dir) {
    +        return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
    +    }
    +
    +    public static void throwRuntime (Object... strings) {
    +        String combinedErrorMessage = "";
    +        for (Object oneMessage: strings) {
    +            combinedErrorMessage = combinedErrorMessage + 
oneMessage.toString();
    +        }
    +        throw new RuntimeException(combinedErrorMessage);
    --- End diff --
    
    String concatenation is broken here if a nil/null is passed in.  Can at 
least switch to using StringBuilder, and I would really prefer to just see this 
inlined everywhere.  Is it really that much simpler to do 
    
    ```
    (Utils/throwRuntime [a b c])
    ```
    
    over
    
    ```
    (throw (RuntimeException. (str a b c))
    ```


> Port backtype.storm.util to java
> --------------------------------
>
>                 Key: STORM-1226
>                 URL: https://issues.apache.org/jira/browse/STORM-1226
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Reza Farivar
>              Labels: java-migration, jstorm-merger
>
> Port backtype.storm.util from clojure to java.  In as many instances as 
> possible the same interface should be maintained, and calls to clojure 
> functions in the rest of the code should be replaces with calls to the 
> corresponding java code.
> Some similar functions can be found at  
> https://github.com/apache/storm/blob/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
> Although they are not identical.
> For function callbacks we may need to evaluate adding in appropriate callback 
> interfaces instead.  Please try to avoid using clojure internal java classes 
> unless necessary.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to