[
https://issues.apache.org/jira/browse/STORM-1279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15218336#comment-15218336
]
ASF GitHub Bot commented on STORM-1279:
---------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/storm/pull/1257#discussion_r57926189
--- Diff:
storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
---
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class RunProfilerActions implements Runnable {
+ private static Logger LOG =
LoggerFactory.getLogger(RunProfilerActions.class);
+
+ private Map conf;
+ private IStormClusterState stormClusterState;
+ private String hostName;
+
+ private String profileCmd;
+
+ private SupervisorData supervisorData;
+
+ private class ActionExitCallback implements Utils.ExitCodeCallable {
+ private String stormId;
+ private ProfileRequest profileRequest;
+ private String logPrefix;
+
+ public ActionExitCallback(String stormId, ProfileRequest
profileRequest, String logPrefix) {
+ this.stormId = stormId;
+ this.profileRequest = profileRequest;
+ this.logPrefix = logPrefix;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ return null;
+ }
+
+ @Override
+ public Object call(int exitCode) {
+ LOG.info("{} profile-action exited for {}", logPrefix,
exitCode);
+ try {
+ stormClusterState.deleteTopologyProfileRequests(stormId,
profileRequest);
+ } catch (Exception e) {
+ LOG.warn("failed delete profileRequest: " +
profileRequest);
+ }
+ return null;
+ }
+ }
+
+ public RunProfilerActions(SupervisorData supervisorData) {
+ this.conf = supervisorData.getConf();
+ this.stormClusterState = supervisorData.getStormClusterState();
+ this.hostName = supervisorData.getHostName();
+ this.profileCmd = (String)
(conf.get(Config.WORKER_PROFILER_COMMAND));
+ this.supervisorData = supervisorData;
+ }
+
+ @Override
+ public void run() {
+ Map<String, List<ProfileRequest>> stormIdToActions =
supervisorData.getStormIdToProfileActions().get();
+ try {
+ for (Map.Entry<String, List<ProfileRequest>> entry :
stormIdToActions.entrySet()) {
+ String stormId = entry.getKey();
+ List<ProfileRequest> requests = entry.getValue();
+ if (requests != null) {
+ for (ProfileRequest profileRequest : requests) {
+ if
(profileRequest.get_nodeInfo().get_node().equals(hostName)) {
+ boolean stop = System.currentTimeMillis() >
profileRequest.get_time_stamp() ? true : false;
+ Long port =
profileRequest.get_nodeInfo().get_port().iterator().next();
+ String targetDir =
ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port));
+ Map stormConf =
ConfigUtils.readSupervisorStormConf(conf, stormId);
+
+ String user = null;
+ if
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) {
+ user = (String)
(stormConf.get(Config.TOPOLOGY_SUBMITTER_USER));
+ }
+ Map<String, String> env = null;
+ if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT)
!= null) {
+ env = (Map<String, String>)
stormConf.get(Config.TOPOLOGY_ENVIRONMENT);
+ } else {
+ env = new HashMap<String, String>();
+ }
+
+ String str =
ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue());
+ StringBuilder stringBuilder = new
StringBuilder();
+ FileReader reader = null;
+ BufferedReader br = null;
+ try {
+ reader = new FileReader(str);
+ br = new BufferedReader(reader);
+ int c;
+ while ((c = br.read()) >= 0) {
+ stringBuilder.append(c);
+ }
+ } catch (IOException e) {
+ if (reader != null)
+ reader.close();
+ if (br != null)
+ br.close();
+ }
+ String workerPid =
stringBuilder.toString().trim();
+ ProfileAction profileAction =
profileRequest.get_action();
+ String logPrefix = "ProfilerAction process " +
stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " ";
+
+ // Until PROFILER_STOP action is invalid, keep
launching profiler start in case worker restarted
+ // The profiler plugin script validates if JVM
is recording before starting another recording.
+ String command = mkCommand(profileAction,
stop, workerPid, targetDir);
+ List<String> listCommand = new ArrayList<>();
+ if (command != null) {
+
listCommand.addAll(Arrays.asList(command.split(" ")));
+ }
--- End diff --
Instead of returning a string and converting it to a List, can we just
return the list, that way we don't have to worry about white space anywhere in
the name of the profile command etc.
> port backtype.storm.daemon.supervisor to java
> ---------------------------------------------
>
> Key: STORM-1279
> URL: https://issues.apache.org/jira/browse/STORM-1279
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: John Fang
> Labels: java-migration, jstorm-merger
> Attachments: Discussion about supervisor.pdf
>
>
> https://github.com/apache/storm/tree/jstorm-import/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor
> as an example
> backtype.storm.event usage should be replaced with built-in java threadpools.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)