update supervisor's structure
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b281c735 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b281c735 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b281c735 Branch: refs/heads/master Commit: b281c735f0089d24407af67586a1b41de45ac382 Parents: 08934e2 Author: xiaojian.fxj <[email protected]> Authored: Fri Feb 26 13:15:56 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Fri Feb 26 13:15:56 2016 +0800 ---------------------------------------------------------------------- .../daemon/supervisor/RunProfilerActions.java | 221 ------------------ .../daemon/supervisor/SupervisorHeartbeat.java | 84 ------- .../daemon/supervisor/SupervisorServer.java | 23 +- .../storm/daemon/supervisor/UpdateBlobs.java | 103 --------- .../supervisor/timer/RunProfilerActions.java | 223 +++++++++++++++++++ .../supervisor/timer/SupervisorHealthCheck.java | 57 +++++ .../supervisor/timer/SupervisorHeartbeat.java | 85 +++++++ .../daemon/supervisor/timer/UpdateBlobs.java | 105 +++++++++ 8 files changed, 476 insertions(+), 425 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java deleted file mode 100644 index 209c067..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.daemon.supervisor; - -import org.apache.storm.Config; -import org.apache.storm.cluster.IStormClusterState; -import org.apache.storm.generated.ProfileAction; -import org.apache.storm.generated.ProfileRequest; -import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.util.*; - -public class RunProfilerActions implements Runnable { - private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); - - private Map conf; - private IStormClusterState stormClusterState; - private String hostName; - private String stormHome; - - private String profileCmd; - - private SupervisorData supervisorData; - - private class ActionExitCallback implements Utils.ExitCodeCallable { - private String stormId; - private ProfileRequest profileRequest; - private String logPrefix; - - public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) { - this.stormId = stormId; - this.profileRequest = profileRequest; - this.logPrefix = logPrefix; - } - - @Override - public Object call() throws Exception { - return null; - } - - @Override - public Object call(int exitCode) { - LOG.info("{} profile-action exited for {}", logPrefix, exitCode); - try { - stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); - } catch (Exception e) { - LOG.warn("failed delete profileRequest: " + profileRequest); - } - return null; - } - } - - public RunProfilerActions(SupervisorData supervisorData) { - this.conf = supervisorData.getConf(); - this.stormClusterState = supervisorData.getStormClusterState(); - this.hostName = supervisorData.getHostName(); - this.stormHome = System.getProperty("storm.home"); - this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); - this.supervisorData = supervisorData; - } - - @Override - public void run() { - Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions(); - try { - for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) { - String stormId = entry.getKey(); - List<ProfileRequest> requests = entry.getValue(); - if (requests != null) { - for (ProfileRequest profileRequest : requests) { - if (profileRequest.get_nodeInfo().get_node().equals(hostName)) { - boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false; - Long port = profileRequest.get_nodeInfo().get_port().iterator().next(); - String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port)); - Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); - - String user = null; - if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) { - user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)); - } - Map<String, String> env = null; - if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) { - env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); - } else { - env = new HashMap<String, String>(); - } - - String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue()); - StringBuilder stringBuilder = new StringBuilder(); - FileReader reader = null; - BufferedReader br = null; - try { - reader = new FileReader(str); - br = new BufferedReader(reader); - int c; - while ((c = br.read()) >= 0) { - stringBuilder.append(c); - } - } catch (IOException e) { - if (reader != null) - reader.close(); - if (br != null) - br.close(); - } - String workerPid = stringBuilder.toString().trim(); - ProfileAction profileAction = profileRequest.get_action(); - String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " "; - - // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted - // The profiler plugin script validates if JVM is recording before starting another recording. - String command = mkCommand(profileAction, stop, workerPid, targetDir); - List<String> listCommand = new ArrayList<>(); - if (command != null) { - listCommand.addAll(Arrays.asList(command.split(" "))); - } - try { - ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix); - launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix); - } catch (IOException e) { - LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); - } catch (RuntimeException e) { - LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); - } - } - } - } - } - } catch (Exception e) { - LOG.error("Error running profiler actions, will retry again later"); - } - } - - private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment, - final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException { - File targetFile = new File(targetDir); - if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { - LOG.info("Running as user:{} command:{}", user, commands); - String containerFile = Utils.containerFilePath(targetDir); - if (Utils.checkFileExists(containerFile)) { - SupervisorUtils.rmrAsUser(conf, containerFile, containerFile); - } - String scriptFile = Utils.scriptFilePath(targetDir); - if (Utils.checkFileExists(scriptFile)) { - SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile); - } - String script = Utils.writeScript(targetDir, commands, environment); - List<String> newCommands = new ArrayList<>(); - newCommands.add("profiler"); - newCommands.add(targetDir); - newCommands.add(script); - SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile); - } else { - Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile); - } - } - - private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) { - if (action == ProfileAction.JMAP_DUMP) { - return jmapDumpCmd(workerPid, targetDir); - } else if (action == ProfileAction.JSTACK_DUMP) { - return jstackDumpCmd(workerPid, targetDir); - } else if (action == ProfileAction.JPROFILE_DUMP) { - return jprofileDump(workerPid, targetDir); - } else if (action == ProfileAction.JVM_RESTART) { - return jprofileJvmRestart(workerPid); - } else if (!stop && action == ProfileAction.JPROFILE_STOP) { - return jprofileStart(workerPid); - } else if (stop && action == ProfileAction.JPROFILE_STOP) { - return jprofileStop(workerPid, targetDir); - } - return null; - } - - private String jmapDumpCmd(String pid, String targetDir) { - return profileCmd + " " + pid + " jmap " + targetDir; - } - - private String jstackDumpCmd(String pid, String targetDir) { - return profileCmd + " " + pid + " jstack " + targetDir; - } - - private String jprofileStart(String pid) { - return profileCmd + " " + pid + " start"; - } - - private String jprofileStop(String pid, String targetDir) { - return profileCmd + " " + pid + " stop " + targetDir; - } - - private String jprofileDump(String pid, String targetDir) { - return profileCmd + " " + pid + " dump " + targetDir; - } - - private String jprofileJvmRestart(String pid) { - return profileCmd + " " + pid + " kill"; - } - -} http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java deleted file mode 100644 index 399dcd2..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.supervisor; - -import org.apache.storm.Config; -import org.apache.storm.cluster.IStormClusterState; -import org.apache.storm.generated.SupervisorInfo; -import org.apache.storm.utils.Time; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class SupervisorHeartbeat implements Runnable { - - private IStormClusterState stormClusterState; - private String supervisorId; - private Map conf; - private SupervisorInfo supervisorInfo; - - private SupervisorData supervisorData; - - public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) { - this.stormClusterState = supervisorData.getStormClusterState(); - this.supervisorId = supervisorData.getSupervisorId(); - this.supervisorData = supervisorData; - this.conf = conf; - } - - private SupervisorInfo update(Map conf, SupervisorData supervisorData) { - supervisorInfo = new SupervisorInfo(); - supervisorInfo.set_time_secs(Time.currentTimeSecs()); - supervisorInfo.set_hostname(supervisorData.getHostName()); - supervisorInfo.set_assignment_id(supervisorData.getAssignmentId()); - - List<Long> usedPorts = new ArrayList<>(); - usedPorts.addAll(supervisorData.getCurrAssignment().keySet()); - supervisorInfo.set_used_ports(usedPorts); - List<Long> portList = new ArrayList<>(); - Object metas = supervisorData.getiSupervisor().getMetadata(); - if (metas != null) { - for (Integer port : (List<Integer>) metas) { - portList.add(port.longValue()); - } - } - supervisorInfo.set_meta(portList); - supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META)); - supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime()); - supervisorInfo.set_version(supervisorData.getStormVersion()); - supervisorInfo.set_resources_map(mkSupervisorCapacities(conf)); - return supervisorInfo; - } - - private Map<String, Double> mkSupervisorCapacities(Map conf) { - Map<String, Double> ret = new HashMap<String, Double>(); - Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)); - ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem); - Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY)); - ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu); - return ret; - } - - @Override - public void run() { - SupervisorInfo supervisorInfo = update(conf, supervisorData); - stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java index f1dfb8a..fd31631 100644 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java @@ -25,6 +25,10 @@ import org.apache.storm.StormTimer; import org.apache.storm.command.HealthCheck; import org.apache.storm.daemon.metrics.MetricsUtils; import org.apache.storm.daemon.metrics.reporters.PreparableReporter; +import org.apache.storm.daemon.supervisor.timer.RunProfilerActions; +import org.apache.storm.daemon.supervisor.timer.SupervisorHealthCheck; +import org.apache.storm.daemon.supervisor.timer.SupervisorHeartbeat; +import org.apache.storm.daemon.supervisor.timer.UpdateBlobs; import org.apache.storm.event.EventManagerImp; import org.apache.storm.localizer.Localizer; import org.apache.storm.messaging.IContext; @@ -42,7 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -public class SupervisorServer extends ShutdownWork { +public class SupervisorServer { private static Logger LOG = LoggerFactory.getLogger(SupervisorServer.class); /** @@ -98,22 +102,7 @@ public class SupervisorServer extends ShutdownWork { supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager)); // supervisor health check - eventTimer.scheduleRecurring(300, 300, new Runnable() { - @Override - public void run() { - int healthCode = HealthCheck.healthCheck(conf); - Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); - if (healthCode != 0) { - for (String workerId : workerIds) { - try { - shutWorker(supervisorData, workerId); - } catch (Exception e) { - throw Utils.wrapInRuntime(e); - } - } - } - } - }); + eventTimer.scheduleRecurring(300, 300, new SupervisorHealthCheck(supervisorData)); // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager)); http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java deleted file mode 100644 index 90dccae..0000000 --- a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/UpdateBlobs.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.daemon.supervisor; - -import org.apache.storm.Config; -import org.apache.storm.generated.AuthorizationException; -import org.apache.storm.generated.KeyNotFoundException; -import org.apache.storm.generated.LocalAssignment; -import org.apache.storm.localizer.LocalResource; -import org.apache.storm.localizer.Localizer; -import org.apache.storm.utils.ConfigUtils; -import org.apache.storm.utils.NimbusLeaderNotFoundException; -import org.apache.storm.utils.Utils; -import org.apache.thrift.transport.TTransportException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The - * Runnable is intended to be run periodically by a timer, created elsewhere. - */ -public class UpdateBlobs implements Runnable { - - private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class); - - private SupervisorData supervisorData; - - public UpdateBlobs(SupervisorData supervisorData) { - this.supervisorData = supervisorData; - } - - @Override - public void run() { - try { - Map conf = supervisorData.getConf(); - Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf); - ConcurrentHashMap<Long, LocalAssignment> newAssignment = supervisorData.getCurrAssignment(); - Set<String> assignedStormIds = new HashSet<>(); - for (LocalAssignment localAssignment : newAssignment.values()) { - assignedStormIds.add(localAssignment.get_topology_id()); - } - for (String stormId : downloadedStormIds) { - if (assignedStormIds.contains(stormId)) { - String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); - LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot); - updateBlobsForTopology(conf, stormId, supervisorData.getLocalizer()); - } - } - } catch (Exception e) { - if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) { - LOG.error("Network error while updating blobs, will retry again later", e); - } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) { - LOG.error("Nimbus unavailable to update blobs, will retry again later", e); - } else { - throw Utils.wrapInRuntime(e); - } - } - } - - /** - * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded. - * - * @param conf - * @param stormId - * @param localizer - * @throws IOException - */ - private void updateBlobsForTopology(Map conf, String stormId, Localizer localizer) throws IOException { - Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); - Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); - String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); - List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); - try { - localizer.updateBlobs(localresources, user); - } catch (AuthorizationException authExp) { - LOG.error("AuthorizationException error", authExp); - } catch (KeyNotFoundException knf) { - LOG.error("KeyNotFoundException error", knf); - } - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java new file mode 100644 index 0000000..2d73327 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { + private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + + private Map conf; + private IStormClusterState stormClusterState; + private String hostName; + private String stormHome; + + private String profileCmd; + + private SupervisorData supervisorData; + + private class ActionExitCallback implements Utils.ExitCodeCallable { + private String stormId; + private ProfileRequest profileRequest; + private String logPrefix; + + public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) { + this.stormId = stormId; + this.profileRequest = profileRequest; + this.logPrefix = logPrefix; + } + + @Override + public Object call() throws Exception { + return null; + } + + @Override + public Object call(int exitCode) { + LOG.info("{} profile-action exited for {}", logPrefix, exitCode); + try { + stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); + } catch (Exception e) { + LOG.warn("failed delete profileRequest: " + profileRequest); + } + return null; + } + } + + public RunProfilerActions(SupervisorData supervisorData) { + this.conf = supervisorData.getConf(); + this.stormClusterState = supervisorData.getStormClusterState(); + this.hostName = supervisorData.getHostName(); + this.stormHome = System.getProperty("storm.home"); + this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); + this.supervisorData = supervisorData; + } + + @Override + public void run() { + Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions(); + try { + for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) { + String stormId = entry.getKey(); + List<ProfileRequest> requests = entry.getValue(); + if (requests != null) { + for (ProfileRequest profileRequest : requests) { + if (profileRequest.get_nodeInfo().get_node().equals(hostName)) { + boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false; + Long port = profileRequest.get_nodeInfo().get_port().iterator().next(); + String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port)); + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + + String user = null; + if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) { + user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)); + } + Map<String, String> env = null; + if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) { + env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); + } else { + env = new HashMap<String, String>(); + } + + String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue()); + StringBuilder stringBuilder = new StringBuilder(); + FileReader reader = null; + BufferedReader br = null; + try { + reader = new FileReader(str); + br = new BufferedReader(reader); + int c; + while ((c = br.read()) >= 0) { + stringBuilder.append(c); + } + } catch (IOException e) { + if (reader != null) + reader.close(); + if (br != null) + br.close(); + } + String workerPid = stringBuilder.toString().trim(); + ProfileAction profileAction = profileRequest.get_action(); + String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " "; + + // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted + // The profiler plugin script validates if JVM is recording before starting another recording. + String command = mkCommand(profileAction, stop, workerPid, targetDir); + List<String> listCommand = new ArrayList<>(); + if (command != null) { + listCommand.addAll(Arrays.asList(command.split(" "))); + } + try { + ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix); + launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix); + } catch (IOException e) { + LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); + } catch (RuntimeException e) { + LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); + } + } + } + } + } + } catch (Exception e) { + LOG.error("Error running profiler actions, will retry again later"); + } + } + + private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment, + final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException { + File targetFile = new File(targetDir); + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + LOG.info("Running as user:{} command:{}", user, commands); + String containerFile = Utils.containerFilePath(targetDir); + if (Utils.checkFileExists(containerFile)) { + SupervisorUtils.rmrAsUser(conf, containerFile, containerFile); + } + String scriptFile = Utils.scriptFilePath(targetDir); + if (Utils.checkFileExists(scriptFile)) { + SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile); + } + String script = Utils.writeScript(targetDir, commands, environment); + List<String> newCommands = new ArrayList<>(); + newCommands.add("profiler"); + newCommands.add(targetDir); + newCommands.add(script); + SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile); + } else { + Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile); + } + } + + private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) { + if (action == ProfileAction.JMAP_DUMP) { + return jmapDumpCmd(workerPid, targetDir); + } else if (action == ProfileAction.JSTACK_DUMP) { + return jstackDumpCmd(workerPid, targetDir); + } else if (action == ProfileAction.JPROFILE_DUMP) { + return jprofileDump(workerPid, targetDir); + } else if (action == ProfileAction.JVM_RESTART) { + return jprofileJvmRestart(workerPid); + } else if (!stop && action == ProfileAction.JPROFILE_STOP) { + return jprofileStart(workerPid); + } else if (stop && action == ProfileAction.JPROFILE_STOP) { + return jprofileStop(workerPid, targetDir); + } + return null; + } + + private String jmapDumpCmd(String pid, String targetDir) { + return profileCmd + " " + pid + " jmap " + targetDir; + } + + private String jstackDumpCmd(String pid, String targetDir) { + return profileCmd + " " + pid + " jstack " + targetDir; + } + + private String jprofileStart(String pid) { + return profileCmd + " " + pid + " start"; + } + + private String jprofileStop(String pid, String targetDir) { + return profileCmd + " " + pid + " stop " + targetDir; + } + + private String jprofileDump(String pid, String targetDir) { + return profileCmd + " " + pid + " dump " + targetDir; + } + + private String jprofileJvmRestart(String pid) { + return profileCmd + " " + pid + " kill"; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java new file mode 100644 index 0000000..36ee6b6 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHealthCheck.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.command.HealthCheck; +import org.apache.storm.daemon.supervisor.ShutdownWork; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorHealthCheck extends ShutdownWork implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorHealthCheck.class); + + private SupervisorData supervisorData; + + public SupervisorHealthCheck(SupervisorData supervisorData) { + this.supervisorData = supervisorData; + } + + @Override + public void run() { + Map conf = supervisorData.getConf(); + int healthCode = HealthCheck.healthCheck(conf); + Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); + if (healthCode != 0) { + for (String workerId : workerIds) { + try { + shutWorker(supervisorData, workerId); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java new file mode 100644 index 0000000..d41ca87 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.generated.SupervisorInfo; +import org.apache.storm.utils.Time; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SupervisorHeartbeat implements Runnable { + + private IStormClusterState stormClusterState; + private String supervisorId; + private Map conf; + private SupervisorInfo supervisorInfo; + + private SupervisorData supervisorData; + + public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) { + this.stormClusterState = supervisorData.getStormClusterState(); + this.supervisorId = supervisorData.getSupervisorId(); + this.supervisorData = supervisorData; + this.conf = conf; + } + + private SupervisorInfo update(Map conf, SupervisorData supervisorData) { + supervisorInfo = new SupervisorInfo(); + supervisorInfo.set_time_secs(Time.currentTimeSecs()); + supervisorInfo.set_hostname(supervisorData.getHostName()); + supervisorInfo.set_assignment_id(supervisorData.getAssignmentId()); + + List<Long> usedPorts = new ArrayList<>(); + usedPorts.addAll(supervisorData.getCurrAssignment().keySet()); + supervisorInfo.set_used_ports(usedPorts); + List<Long> portList = new ArrayList<>(); + Object metas = supervisorData.getiSupervisor().getMetadata(); + if (metas != null) { + for (Integer port : (List<Integer>) metas) { + portList.add(port.longValue()); + } + } + supervisorInfo.set_meta(portList); + supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META)); + supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime()); + supervisorInfo.set_version(supervisorData.getStormVersion()); + supervisorInfo.set_resources_map(mkSupervisorCapacities(conf)); + return supervisorInfo; + } + + private Map<String, Double> mkSupervisorCapacities(Map conf) { + Map<String, Double> ret = new HashMap<String, Double>(); + Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)); + ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem); + Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY)); + ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu); + return ret; + } + + @Override + public void run() { + SupervisorInfo supervisorInfo = update(conf, supervisorData); + stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b281c735/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java new file mode 100644 index 0000000..623afa5 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/UpdateBlobs.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.AuthorizationException; +import org.apache.storm.generated.KeyNotFoundException; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.NimbusLeaderNotFoundException; +import org.apache.storm.utils.Utils; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * downloads all blobs listed in the topology configuration for all topologies assigned to this supervisor, and creates version files with a suffix. The + * Runnable is intended to be run periodically by a timer, created elsewhere. + */ +public class UpdateBlobs implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(UpdateBlobs.class); + + private SupervisorData supervisorData; + + public UpdateBlobs(SupervisorData supervisorData) { + this.supervisorData = supervisorData; + } + + @Override + public void run() { + try { + Map conf = supervisorData.getConf(); + Set<String> downloadedStormIds = SupervisorUtils.readDownLoadedStormIds(conf); + ConcurrentHashMap<Long, LocalAssignment> newAssignment = supervisorData.getCurrAssignment(); + Set<String> assignedStormIds = new HashSet<>(); + for (LocalAssignment localAssignment : newAssignment.values()) { + assignedStormIds.add(localAssignment.get_topology_id()); + } + for (String stormId : downloadedStormIds) { + if (assignedStormIds.contains(stormId)) { + String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + LOG.debug("Checking Blob updates for storm topology id {} With target_dir: {}", stormId, stormRoot); + updateBlobsForTopology(conf, stormId, supervisorData.getLocalizer()); + } + } + } catch (Exception e) { + if (Utils.exceptionCauseIsInstanceOf(TTransportException.class, e)) { + LOG.error("Network error while updating blobs, will retry again later", e); + } else if (Utils.exceptionCauseIsInstanceOf(NimbusLeaderNotFoundException.class, e)) { + LOG.error("Nimbus unavailable to update blobs, will retry again later", e); + } else { + throw Utils.wrapInRuntime(e); + } + } + } + + /** + * Update each blob listed in the topology configuration if the latest version of the blob has not been downloaded. + * + * @param conf + * @param stormId + * @param localizer + * @throws IOException + */ + private void updateBlobsForTopology(Map conf, String stormId, Localizer localizer) throws IOException { + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); + try { + localizer.updateBlobs(localresources, user); + } catch (AuthorizationException authExp) { + LOG.error("AuthorizationException error", authExp); + } catch (KeyNotFoundException knf) { + LOG.error("KeyNotFoundException error", knf); + } + } +}
