Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1642#discussion_r78957510
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/BasicContainer.java ---
    @@ -0,0 +1,644 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.daemon.supervisor;
    +
    +import java.io.File;
    +import java.io.FilenameFilter;
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.Iterator;
    +import java.util.HashMap;
    +import java.util.LinkedList;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.stream.Collectors;
    +
    +import org.apache.commons.lang.StringUtils;
    +import org.apache.storm.Config;
    +import org.apache.storm.container.ResourceIsolationInterface;
    +import org.apache.storm.generated.LocalAssignment;
    +import org.apache.storm.generated.ProfileAction;
    +import org.apache.storm.generated.ProfileRequest;
    +import org.apache.storm.generated.StormTopology;
    +import org.apache.storm.generated.WorkerResources;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.LocalState;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Lists;
    +
    +/**
    + * A container that runs processes on the local box.
    + */
    +public class BasicContainer extends Container {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(BasicContainer.class);
    +    private static final FilenameFilter jarFilter = new FilenameFilter() {
    +        @Override
    +        public boolean accept(File dir, String name) {
    +            return name.endsWith(".jar");
    +        }
    +    };
    +    private static final Joiner CPJ = 
    +            Joiner.on(Utils.CLASS_PATH_SEPARATOR).skipNulls();
    +    
    +    protected final LocalState _localState;
    +    protected final String _profileCmd;
    +    protected final String _stormHome = System.getProperty("storm.home");
    +    protected volatile boolean _exitedEarly = false;
    +
    +    private class ProcessExitCallback implements ExitCodeCallback {
    +        private final String _logPrefix;
    +
    +        public ProcessExitCallback(String logPrefix) {
    +            _logPrefix = logPrefix;
    +        }
    +
    +        @Override
    +        public void call(int exitCode) {
    +            LOG.info("{} exited with code: {}", _logPrefix, exitCode);
    +            _exitedEarly = true;
    +        }
    +    }
    +    
    +    /**
    +     * Create a new BasicContainer
    +     * @param type the type of container being made.
    +     * @param conf the supervisor config
    +     * @param supervisorId the ID of the supervisor this is a part of.
    +     * @param port the port the container is on.  Should be <= 0 if only a 
partial recovery
    +     * @param assignment the assignment for this container. Should be null 
if only a partial recovery.
    +     * @param resourceIsolationManager used to isolate resources for a 
container can be null if no isolation is used.
    +     * @param localState the local state of the supervisor.  May be null 
if partial recovery
    +     * @param workerId the id of the worker to use.  Must not be null if 
doing a partial recovery.
    +     * @param ops file system operations (mostly for testing) if null a 
new one is made
    +     * @param topoConf the config of the topology (mostly for testing) if 
null 
    +     * and not a partial recovery the real conf is read.
    +     * @param profileCmd the command to use when profiling (used for 
testing)
    +     */
    +    public BasicContainer(ContainerType type, Map<String, Object> conf, 
String supervisorId, int port,
    +            LocalAssignment assignment, ResourceIsolationInterface 
resourceIsolationManager,
    +            LocalState localState, String workerId, Map<String, Object> 
topoConf, 
    +            AdvancedFSOps ops, String profileCmd) throws IOException {
    +        super(type, conf, supervisorId, port, assignment, 
resourceIsolationManager, workerId, topoConf, ops);
    +        assert(localState != null);
    +        _localState = localState;
    +
    +        if (type.isRecovery() && !type.isOnlyKillable()) {
    +            synchronized (localState) {
    +                String wid = null;
    +                Map<String, Integer> workerToPort = 
localState.getApprovedWorkers();
    +                for (Map.Entry<String, Integer> entry : 
workerToPort.entrySet()) {
    +                    if (port == entry.getValue().intValue()) {
    +                        wid = entry.getKey();
    +                    }
    +                }
    +                if (wid == null) {
    +                    throw new ContainerRecoveryException("Could not find 
worker id for " + port + " " + assignment);
    +                }
    +                LOG.info("Recovered Worker {}", wid);
    +                _workerId = wid;
    +            }
    +        } else if (_workerId == null){
    +            createNewWorkerId();
    +        }
    +
    +        if (profileCmd == null) {
    +            profileCmd = _stormHome + Utils.FILE_PATH_SEPARATOR + "bin" + 
Utils.FILE_PATH_SEPARATOR
    +                    + conf.get(Config.WORKER_PROFILER_COMMAND);
    +        }
    +        _profileCmd = profileCmd;
    +    }
    +
    +    /**
    +     * Create a new worker ID for this process and store in in this object 
and
    +     * in the local state.  Never call this if a worker is currently up 
and running.
    +     * We will lose track of the process.
    +     */
    +    protected void createNewWorkerId() {
    +        _type.assertFull();
    +        assert(_workerId == null);
    +        synchronized (_localState) {
    +            _workerId = Utils.uuid();
    +            Map<String, Integer> workerToPort = 
_localState.getApprovedWorkers();
    +            if (workerToPort == null) {
    +                workerToPort = new HashMap<>(1);
    +            }
    +            removeWorkersOn(workerToPort, _port);
    +            workerToPort.put(_workerId, _port);
    +            _localState.setApprovedWorkers(workerToPort);
    +            LOG.info("Created Worker ID {}", _workerId);
    +        }
    +    }
    +
    +    private static void removeWorkersOn(Map<String, Integer> workerToPort, 
int _port) {
    +        for (Iterator<Entry<String, Integer>> i = 
workerToPort.entrySet().iterator(); i.hasNext();) {
    +            Entry<String, Integer> found = i.next();
    +            if (_port == found.getValue().intValue()) {
    +                LOG.warn("Deleting worker {} from state", found.getKey());
    +                i.remove();
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void cleanUpForRestart() throws IOException {
    +        String origWorkerId = _workerId;
    +        super.cleanUpForRestart();
    +        synchronized (_localState) {
    +            Map<String, Integer> workersToPort = 
_localState.getApprovedWorkers();
    +            workersToPort.remove(origWorkerId);
    +            removeWorkersOn(workersToPort, _port);
    +            _localState.setApprovedWorkers(workersToPort);
    +            LOG.info("Removed Worker ID {}", origWorkerId);
    +        }
    +    }
    +
    +    @Override
    +    public void relaunch() throws IOException {
    +        _type.assertFull();
    +        //We are launching it now...
    +        _type = ContainerType.LAUNCH;
    +        createNewWorkerId();
    +        setup();
    +        launch();
    +    }
    +
    +    @Override
    +    public boolean didMainProcessExit() {
    +        return _exitedEarly;
    +    }
    +
    +    /**
    +     * Run the given command for profiling
    +     * 
    +     * @param command
    +     *            the command to run
    +     * @param env
    +     *            the environment to run the command
    +     * @param logPrefix
    +     *            the prefix to include in the logs
    +     * @param targetDir
    +     *            the working directory to run the command in
    +     * @return true if it ran successfully, else false
    +     * @throws IOException
    +     *             on any error
    +     * @throws InterruptedException
    +     *             if interrupted wile waiting for the process to exit.
    +     */
    +    protected boolean runProfilingCommand(List<String> command, 
Map<String, String> env, String logPrefix,
    +            File targetDir) throws IOException, InterruptedException {
    +        _type.assertFull();
    +        Process p = SupervisorUtils.launchProcess(command, env, logPrefix, 
null, targetDir);
    +        int ret = p.waitFor();
    +        return ret == 0;
    +    }
    +
    +    @Override
    +    public boolean runProfiling(ProfileRequest request, boolean stop) 
throws IOException, InterruptedException {
    +        _type.assertFull();
    +        String targetDir = ConfigUtils.workerArtifactsRoot(_conf, 
_topologyId, _port);
    +
    +        @SuppressWarnings("unchecked")
    +        Map<String, String> env = (Map<String, String>) 
_topoConf.get(Config.TOPOLOGY_ENVIRONMENT);
    +        if (env == null) {
    +            env = new HashMap<String, String>();
    +        }
    +
    +        String str = ConfigUtils.workerArtifactsPidPath(_conf, 
_topologyId, _port);
    +
    +        String workerPid = _ops.slurpString(new File(str)).trim();
    +
    +        ProfileAction profileAction = request.get_action();
    +        String logPrefix = "ProfilerAction process " + _topologyId + ":" + 
_port + " PROFILER_ACTION: " + profileAction
    +                + " ";
    +
    +        List<String> command = mkProfileCommand(profileAction, stop, 
workerPid, targetDir);
    +
    +        File targetFile = new File(targetDir);
    +        if (command.size() > 0) {
    +            return runProfilingCommand(command, env, logPrefix, 
targetFile);
    +        }
    +        LOG.warn("PROFILING REQUEST NOT SUPPORTED {} IGNORED...", request);
    +        return true;
    --- End diff --
    
    Shouldn't this return false because profiling didn't succeed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to