[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15218336#comment-15218336
 ] 

ASF GitHub Bot commented on STORM-1279:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1257#discussion_r57926189
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
 ---
    @@ -0,0 +1,221 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.supervisor.timer;
    +
    +import org.apache.storm.Config;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.daemon.supervisor.SupervisorData;
    +import org.apache.storm.daemon.supervisor.SupervisorUtils;
    +import org.apache.storm.generated.ProfileAction;
    +import org.apache.storm.generated.ProfileRequest;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.File;
    +import java.io.FileReader;
    +import java.io.IOException;
    +import java.util.*;
    +
    +public class RunProfilerActions implements Runnable {
    +    private static Logger LOG = 
LoggerFactory.getLogger(RunProfilerActions.class);
    +
    +    private Map conf;
    +    private IStormClusterState stormClusterState;
    +    private String hostName;
    +
    +    private String profileCmd;
    +
    +    private SupervisorData supervisorData;
    +
    +    private class ActionExitCallback implements Utils.ExitCodeCallable {
    +        private String stormId;
    +        private ProfileRequest profileRequest;
    +        private String logPrefix;
    +
    +        public ActionExitCallback(String stormId, ProfileRequest 
profileRequest, String logPrefix) {
    +            this.stormId = stormId;
    +            this.profileRequest = profileRequest;
    +            this.logPrefix = logPrefix;
    +        }
    +
    +        @Override
    +        public Object call() throws Exception {
    +            return null;
    +        }
    +
    +        @Override
    +        public Object call(int exitCode) {
    +            LOG.info("{} profile-action exited for {}", logPrefix, 
exitCode);
    +            try {
    +                stormClusterState.deleteTopologyProfileRequests(stormId, 
profileRequest);
    +            } catch (Exception e) {
    +                LOG.warn("failed delete profileRequest: " + 
profileRequest);
    +            }
    +            return null;
    +        }
    +    }
    +
    +    public RunProfilerActions(SupervisorData supervisorData) {
    +        this.conf = supervisorData.getConf();
    +        this.stormClusterState = supervisorData.getStormClusterState();
    +        this.hostName = supervisorData.getHostName();
    +        this.profileCmd = (String) 
(conf.get(Config.WORKER_PROFILER_COMMAND));
    +        this.supervisorData = supervisorData;
    +    }
    +
    +    @Override
    +    public void run() {
    +        Map<String, List<ProfileRequest>> stormIdToActions = 
supervisorData.getStormIdToProfileActions().get();
    +        try {
    +            for (Map.Entry<String, List<ProfileRequest>> entry : 
stormIdToActions.entrySet()) {
    +                String stormId = entry.getKey();
    +                List<ProfileRequest> requests = entry.getValue();
    +                if (requests != null) {
    +                    for (ProfileRequest profileRequest : requests) {
    +                        if 
(profileRequest.get_nodeInfo().get_node().equals(hostName)) {
    +                            boolean stop = System.currentTimeMillis() > 
profileRequest.get_time_stamp() ? true : false;
    +                            Long port = 
profileRequest.get_nodeInfo().get_port().iterator().next();
    +                            String targetDir = 
ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
    +                            Map stormConf = 
ConfigUtils.readSupervisorStormConf(conf, stormId);
    +
    +                            String user = null;
    +                            if 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
    +                                user = (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
    +                            }
    +                            Map<String, String> env = null;
    +                            if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) 
!= null) {
    +                                env = (Map<String, String>) 
stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
    +                            } else {
    +                                env = new HashMap<String, String>();
    +                            }
    +
    +                            String str = 
ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
    +                            StringBuilder stringBuilder = new 
StringBuilder();
    +                            FileReader reader = null;
    +                            BufferedReader br = null;
    +                            try {
    +                                reader = new FileReader(str);
    +                                br = new BufferedReader(reader);
    +                                int c;
    +                                while ((c = br.read()) >= 0) {
    +                                    stringBuilder.append(c);
    +                                }
    +                            } catch (IOException e) {
    +                                if (reader != null)
    +                                    reader.close();
    +                                if (br != null)
    +                                    br.close();
    +                            }
    +                            String workerPid = 
stringBuilder.toString().trim();
    +                            ProfileAction profileAction = 
profileRequest.get_action();
    +                            String logPrefix = "ProfilerAction process " + 
stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
    +
    +                            // Until PROFILER_STOP action is invalid, keep 
launching profiler start in case worker restarted
    +                            // The profiler plugin script validates if JVM 
is recording before starting another recording.
    +                            String command = mkCommand(profileAction, 
stop, workerPid, targetDir);
    +                            List<String> listCommand = new ArrayList<>();
    +                            if (command != null) {
    +                                
listCommand.addAll(Arrays.asList(command.split(" ")));
    +                            }
    --- End diff --
    
    Instead of returning a string and converting it to a List, can we just 
return the list, that way we don't have to worry about white space anywhere in 
the name of the profile command etc.


> port backtype.storm.daemon.supervisor to java
> ---------------------------------------------
>
>                 Key: STORM-1279
>                 URL: https://issues.apache.org/jira/browse/STORM-1279
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: John Fang
>              Labels: java-migration, jstorm-merger
>         Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to