[ 
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15220831#comment-15220831
 ] 

ASF GitHub Bot commented on STORM-1279:
---------------------------------------

Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/1257#discussion_r58142341
  
    --- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
 ---
    @@ -0,0 +1,214 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.supervisor.timer;
    +
    +import com.google.common.collect.Lists;
    +import org.apache.storm.Config;
    +import org.apache.storm.cluster.IStormClusterState;
    +import org.apache.storm.daemon.supervisor.SupervisorData;
    +import org.apache.storm.daemon.supervisor.SupervisorUtils;
    +import org.apache.storm.generated.ProfileAction;
    +import org.apache.storm.generated.ProfileRequest;
    +import org.apache.storm.utils.ConfigUtils;
    +import org.apache.storm.utils.Utils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.io.BufferedReader;
    +import java.io.File;
    +import java.io.FileReader;
    +import java.io.IOException;
    +import java.util.*;
    +
    +public class RunProfilerActions implements Runnable {
    +    private static Logger LOG = 
LoggerFactory.getLogger(RunProfilerActions.class);
    +
    +    private Map conf;
    +    private IStormClusterState stormClusterState;
    +    private String hostName;
    +
    +    private String profileCmd;
    +
    +    private SupervisorData supervisorData;
    +
    +    private class ActionExitCallback implements Utils.ExitCodeCallable {
    +        private String stormId;
    +        private ProfileRequest profileRequest;
    +        private String logPrefix;
    +        private boolean stop;
    +
    +        public ActionExitCallback(String stormId, ProfileRequest 
profileRequest, String logPrefix, boolean stop) {
    +            this.stormId = stormId;
    +            this.profileRequest = profileRequest;
    +            this.logPrefix = logPrefix;
    +            this.stop = stop;
    +        }
    +
    +        @Override
    +        public Object call() throws Exception {
    +            return null;
    +        }
    +
    +        @Override
    +        public Object call(int exitCode) {
    +            LOG.info("{} profile-action exited for {}", logPrefix, 
exitCode);
    +            try {
    +                if (stop)
    +                    
stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest);
    +            } catch (Exception e) {
    +                LOG.warn("failed delete profileRequest: " + 
profileRequest);
    +            }
    +            return null;
    +        }
    +    }
    +
    +    public RunProfilerActions(SupervisorData supervisorData) {
    +        this.conf = supervisorData.getConf();
    +        this.stormClusterState = supervisorData.getStormClusterState();
    +        this.hostName = supervisorData.getHostName();
    +        this.profileCmd = (String) 
(conf.get(Config.WORKER_PROFILER_COMMAND));
    +        this.supervisorData = supervisorData;
    +    }
    +
    +    @Override
    +    public void run() {
    +        Map<String, List<ProfileRequest>> stormIdToActions = 
supervisorData.getStormIdToProfilerActions().get();
    +        try {
    +            for (Map.Entry<String, List<ProfileRequest>> entry : 
stormIdToActions.entrySet()) {
    +                String stormId = entry.getKey();
    +                List<ProfileRequest> requests = entry.getValue();
    +                if (requests != null) {
    +                    for (ProfileRequest profileRequest : requests) {
    +                        if 
(profileRequest.get_nodeInfo().get_node().equals(hostName)) {
    +                            boolean stop = System.currentTimeMillis() > 
profileRequest.get_time_stamp();
    +                            Long port = 
profileRequest.get_nodeInfo().get_port().iterator().next();
    +                            String targetDir = 
ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
    +                            Map stormConf = 
ConfigUtils.readSupervisorStormConf(conf, stormId);
    +
    +                            String user = null;
    +                            if 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
    +                                user = (String) 
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
    +                            }
    +                            Map<String, String> env = null;
    +                            if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) 
!= null) {
    +                                env = (Map<String, String>) 
stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
    +                            } else {
    +                                env = new HashMap<String, String>();
    +                            }
    +
    +                            String str = 
ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
    +                            StringBuilder stringBuilder = new 
StringBuilder();
    +
    +                            try (FileReader reader = new FileReader(str);
    +                                 BufferedReader br = new 
BufferedReader(reader)) {
    +                                int c;
    +                                while ((c = br.read()) >= 0) {
    +                                    stringBuilder.append(c);
    --- End diff --
    
    Appending an int is not what we want, we want to append an ASCII char.  We 
really should be doing a readLine.
    
    ```
                                String pidFile = 
ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
                                String workerPid = null;
    
                                try (FileReader reader = new FileReader(str);
                                     BufferedReader br = new 
BufferedReader(reader)) {
                                     workerPid = br.readLine().trim();
                                }
    ```


> port backtype.storm.daemon.supervisor to java
> ---------------------------------------------
>
>                 Key: STORM-1279
>                 URL: https://issues.apache.org/jira/browse/STORM-1279
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-core
>            Reporter: Robert Joseph Evans
>            Assignee: John Fang
>              Labels: java-migration, jstorm-merger
>         Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
>  as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to