[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192372#comment-15192372
 ] 

ASF GitHub Bot commented on STORM-1279:
---------------------------------------

Github user abhishekagarwal87 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1184#discussion_r55938152
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
    @@ -0,0 +1,353 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon.supervisor;
    +
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.ProcessSimulator;
    +import org.apache.storm.generated.LSWorkerHeartbeat;
    +import org.apache.storm.localizer.LocalResource;
    +import org.apache.storm.localizer.Localizer;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.LocalState;
    +import org.apache.storm.utils.Time;
    +import org.apache.storm.utils.Utils;
    +import org.apache.zookeeper.ZooDefs;
    +import org.apache.zookeeper.data.ACL;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.net.URLDecoder;
    +import java.util.*;
    +
    +public class SupervisorUtils {
    +
    +    private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
    +
    +    private static final SupervisorUtils INSTANCE = new SupervisorUtils();
    +    private static SupervisorUtils _instance = INSTANCE;
    +    public static void setInstance(SupervisorUtils u) {
    +        _instance = u;
    +    }
    +    public static void resetInstance() {
    +        _instance = INSTANCE;
    +    }
    +
    +    public static Process workerLauncher(Map conf, String user, 
List<String> args, Map<String, String> environment, final String logPreFix,
    +            final Utils.ExitCodeCallable exitCodeCallback, File dir) 
throws IOException {
    +        if (StringUtils.isBlank(user)) {
    +            throw new IllegalArgumentException("User cannot be blank when 
calling workerLauncher.");
    +        }
    +        String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
    +        String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
    +        String wl;
    +        if (StringUtils.isNotBlank(wlinitial)) {
    +            wl = wlinitial;
    +        } else {
    +            wl = stormHome + "/bin/worker-launcher";
    +        }
    +        List<String> commands = new ArrayList<>();
    +        commands.add(wl);
    +        commands.add(user);
    +        commands.addAll(args);
    +        LOG.info("Running as user: {} command: {}", user, commands);
    +        return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
    +    }
    +
    +    public static int workerLauncherAndWait(Map conf, String user, 
List<String> args, final Map<String, String> environment, final String 
logPreFix)
    +            throws IOException {
    +        int ret = 0;
    +        Process process = workerLauncher(conf, user, args, environment, 
logPreFix, null, null);
    +        if (StringUtils.isNotBlank(logPreFix))
    +            Utils.readAndLogStream(logPreFix, process.getInputStream());
    +        try {
    +            process.waitFor();
    +        } catch (InterruptedException e) {
    +            LOG.info("{} interrupted.", logPreFix);
    +        }
    +        ret = process.exitValue();
    +        return ret;
    +    }
    +
    +    public static void setupStormCodeDir(Map conf, Map stormConf, String 
dir) throws IOException {
    +        if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
    +            String logPrefix = "setup conf for " + dir;
    +            List<String> commands = new ArrayList<>();
    +            commands.add("code-dir");
    +            commands.add(dir);
    +            workerLauncherAndWait(conf, (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix);
    +        }
    +    }
    +
    +    public static void rmrAsUser(Map conf, String id, String path) throws 
IOException {
    +        String user = Utils.getFileOwner(path);
    +        String logPreFix = "rmr " + id;
    +        List<String> commands = new ArrayList<>();
    +        commands.add("rmr");
    +        commands.add(path);
    +        SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, 
logPreFix);
    +        if (Utils.checkFileExists(path)) {
    +            throw new RuntimeException(path + " was not deleted.");
    +        }
    +    }
    +
    +    /**
    +     * Given the blob information returns the value of the uncompress 
field, handling it either being a string or a boolean value, or if it's not 
specified then
    +     * returns false
    +     * 
    +     * @param blobInfo
    +     * @return
    +     */
    +    public static Boolean shouldUncompressBlob(Map<String, Object> 
blobInfo) {
    +        return new Boolean((String) blobInfo.get("uncompress"));
    --- End diff --
    
    It only handles string values and not boolean values. Correct definition 
would be
    ` return new Boolean(String.valueOf(blobInfo.get("uncompress")))`


> port backtype.storm.daemon.supervisor to java
> ---------------------------------------------
>
>                 Key: STORM-1279
>                 URL: https://issues.apache.org/jira/browse/STORM-1279
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: John Fang
>              Labels: java-migration, jstorm-merger
>         Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to