[ 
https://issues.apache.org/jira/browse/STORM-1226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15137559#comment-15137559
 ] 

ASF GitHub Bot commented on STORM-1226:
---------------------------------------

Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1074#discussion_r52215700
  
    --- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
    @@ -1370,9 +1469,974 @@ public static int toPositive(int number) {
         public static RuntimeException wrapInRuntime(Exception e){
             if (e instanceof RuntimeException){
                 return (RuntimeException)e;
    -        }else {
    +        } else {
                 return new RuntimeException(e);
             }
         }
    -}
     
    +    /**
    +     * Determines if a zip archive contains a particular directory.
    +     *
    +     * @param zipfile path to the zipped file
    +     * @param target directory being looked for in the zip.
    +     * @return boolean whether or not the directory exists in the zip.
    +     */
    +    public static boolean zipDoesContainDir(String zipfile, String target) 
throws IOException {
    +        List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new 
ZipFile(zipfile).entries());
    +
    +        if(entries == null) {
    +            return false;
    +        }
    +
    +        String targetDir = target + "/";
    +        for(ZipEntry entry : entries) {
    +            String name = entry.getName();
    +            if(name.startsWith(targetDir)) {
    +                return true;
    +            }
    +        }
    +
    +        return false;
    +    }
    +
    +    /**
    +     * Joins any number of maps together into a single map, combining 
their values into
    +     * a list, maintaining values in the order the maps were passed in. 
Nulls are inserted
    +     * for given keys when the map does not contain that key.
    +     *
    +     * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' => 
5}) ->
    +     *      {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null, 
5]}
    +     *
    +     * @param maps variable number of maps to join - order affects order 
of values in output.
    +     * @return combined map
    +     */
    +    public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) {
    +        Map<K, List<V>> ret = new HashMap<>();
    +
    +        Set<K> keys = new HashSet<>();
    +
    +        for(Map<K, V> map : maps) {
    +            keys.addAll(map.keySet());
    +        }
    +
    +        for(Map<K, V> m : maps) {
    +            for(K key : keys) {
    +                V value = m.get(key);
    +
    +                if(!ret.containsKey(key)) {
    +                    ret.put(key, new ArrayList<V>());
    +                }
    +
    +                List<V> targetList = ret.get(key);
    +                targetList.add(value);
    +            }
    +        }
    +        return ret;
    +    }
    +
    +    /**
    +     * Fills up chunks out of a collection (given a maximum amount of 
chunks)
    +     *
    +     * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]]
    +     *      partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]]
    +     *      partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
    +     * @param maxNumChunks the maximum number of chunks to return
    +     * @param coll the collection to be chunked up
    +     * @return a list of the chunks, which are themselves lists.
    +     */
    +    public static <T> List<List<T>> partitionFixed(int maxNumChunks, 
Collection<T> coll) {
    +        List<List<T>> ret = new ArrayList<>();
    +
    +        if(maxNumChunks == 0 || coll == null) {
    +            return ret;
    +        }
    +
    +        Map<Integer, Integer> parts = integerDivided(coll.size(), 
maxNumChunks);
    +
    +        // Keys sorted in descending order
    +        List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet());
    +        Collections.sort(sortedKeys, Collections.reverseOrder());
    +
    +
    +        Iterator<T> it = coll.iterator();
    +        for(Integer chunkSize : sortedKeys) {
    +            if(!it.hasNext()) { break; }
    +            Integer times = parts.get(chunkSize);
    +            for(int i = 0; i < times; i++) {
    +                if(!it.hasNext()) { break; }
    +                List<T> chunkList = new ArrayList<>();
    +                for(int j = 0; j < chunkSize; j++) {
    +                    if(!it.hasNext()) { break; }
    +                    chunkList.add(it.next());
    +                }
    +                ret.add(chunkList);
    +            }
    +        }
    +
    +        return ret;
    +    }
    +
    +    /**
    +     * Return a new instance of a pluggable specified in the conf.
    +     * @param conf The conf to read from.
    +     * @param configKey The key pointing to the pluggable class
    +     * @return an instance of the class or null if it is not specified.
    +     */
    +    public static Object getConfiguredClass(Map conf, Object configKey) {
    +        if (conf.containsKey(configKey)) {
    +            return newInstance((String)conf.get(configKey));
    +        }
    +        return null;
    +    }
    +
    +    public static String logsFilename(String stormId, int port) {
    +        return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) + 
FILE_PATH_SEPARATOR + "worker.log";
    +    }
    +
    +    public static String eventLogsFilename(String stormId, int port) {
    +        return stormId + FILE_PATH_SEPARATOR + Integer.toString(port) + 
FILE_PATH_SEPARATOR + "events.log";
    +    }
    +
    +    public static Object readYamlFile(String yamlFile) {
    +        try (FileReader reader = new FileReader(yamlFile)) {
    +            return new Yaml(new SafeConstructor()).load(reader);
    +        }
    +        catch(Exception ex) {
    +            LOG.error("Failed to read yaml file.", ex);
    +        }
    +        return null;
    +    }
    +
    +    public static void setupDefaultUncaughtExceptionHandler() {
    +        Thread.setDefaultUncaughtExceptionHandler(new 
Thread.UncaughtExceptionHandler() {
    +                public void uncaughtException(Thread thread, Throwable 
thrown) {
    +                    try {
    +                        handleUncaughtException(thrown);
    +                    }
    +                    catch (Error err) {
    +                        LOG.error("Received error in main thread.. 
terminating server...", err);
    +                        Runtime.getRuntime().exit(-2);
    +                    }
    +                }
    +            });
    +    }
    +
    +    /**
    +     * Creates a new map with a string value in the map replaced with an 
    +     * equivalently-lengthed string of '#'.
    +     * @param m The map that a value will be redacted from
    +     * @param key The key pointing to the value to be redacted
    +     * @return a new map with the value redacted. The original map will 
not be modified.
    +     */
    +    public static Map redactValue(Map<Object, String> m, Object key) { 
    +        if(m.containsKey(key)) {
    +            HashMap<Object, String> newMap = new HashMap<>(m);
    +            String value = newMap.get(key);
    +            String redacted = new String(new 
char[value.length()]).replace("\0", "#");
    +            newMap.put(key, redacted);
    +            return newMap;
    +        }
    +        return m;
    +    }
    +
    +    public static void logThriftAccess(Integer requestId, InetAddress 
remoteAddress, Principal principal, String operation) {
    +        new ThriftAccessLogger().log(
    +            String.format("Request ID: {} access from: {} principal: {} 
operation: {}",
    +                          requestId, remoteAddress, principal, operation));
    +    }
    +
    +    /**
    +     * Make sure a given key name is valid for the storm config. 
    +     * Throw RuntimeException if the key isn't valid.
    +     * @param name The name of the config key to check.
    +     */
    +    public static void validateKeyName(String name) {
    +        Set<String> disallowedKeys = new HashSet<>();
    --- End diff --
    
    This can be declared and initialized statically.


> Port backtype.storm.util to java
> --------------------------------
>
>                 Key: STORM-1226
>                 URL: https://issues.apache.org/jira/browse/STORM-1226
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: Reza Farivar
>              Labels: java-migration, jstorm-merger
>
> Port backtype.storm.util from clojure to java.  In as many instances as 
> possible the same interface should be maintained, and calls to clojure 
> functions in the rest of the code should be replaces with calls to the 
> corresponding java code.
> Some similar functions can be found at  
> https://github.com/apache/storm/blob/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java
> Although they are not identical.
> For function callbacks we may need to evaluate adding in appropriate callback 
> interfaces instead.  Please try to avoid using clojure internal java classes 
> unless necessary.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to