port Supervisor to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/08934e29 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/08934e29 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/08934e29 Branch: refs/heads/master Commit: 08934e29982d3936c9e247a8d7bac563053f869f Parents: 73312ad Author: xiaojian.fxj <[email protected]> Authored: Fri Feb 26 12:38:23 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Fri Feb 26 12:38:23 2016 +0800 ---------------------------------------------------------------------- .../storm/daemon/supervisor/DaemonCommon.java | 22 + .../DefaultUncaughtExceptionHandler.java | 31 + .../supervisor/EventManagerPushCallback.java | 37 + .../daemon/supervisor/RunProfilerActions.java | 221 ++++++ .../storm/daemon/supervisor/ShutdownWork.java | 125 ++++ .../daemon/supervisor/StandaloneSupervisor.java | 82 +++ .../apache/storm/daemon/supervisor/State.java | 22 + .../storm/daemon/supervisor/StateHeartbeat.java | 45 ++ .../daemon/supervisor/SupervisorDaemon.java | 28 + .../storm/daemon/supervisor/SupervisorData.java | 340 ++++++++++ .../daemon/supervisor/SupervisorHeartbeat.java | 84 +++ .../daemon/supervisor/SupervisorManger.java | 101 +++ .../daemon/supervisor/SupervisorServer.java | 212 ++++++ .../daemon/supervisor/SupervisorUtils.java | 173 +++++ .../daemon/supervisor/SyncProcessEvent.java | 674 +++++++++++++++++++ .../daemon/supervisor/SyncSupervisorEvent.java | 592 ++++++++++++++++ .../storm/daemon/supervisor/UpdateBlobs.java | 103 +++ 17 files changed, 2892 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java new file mode 100644 index 0000000..3b7a18e --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DaemonCommon.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +public interface DaemonCommon { + boolean isWaiting(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java new file mode 100644 index 0000000..8785f86 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/DefaultUncaughtExceptionHandler.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + private static final Logger LOG = LoggerFactory.getLogger(DefaultUncaughtExceptionHandler.class); + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Error when processing event", e); + Utils.exitProcess(20, "Error when processing an event"); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java new file mode 100644 index 0000000..177bf67 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/EventManagerPushCallback.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.event.EventManager; + +public class EventManagerPushCallback implements Runnable { + + private EventManager eventManager; + + private Runnable cb; + + public EventManagerPushCallback(Runnable cb, EventManager eventManager) { + this.eventManager = eventManager; + this.cb = cb; + } + + @Override + public void run() { + eventManager.add(cb); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java new file mode 100644 index 0000000..209c067 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunProfilerActions.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { + private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + + private Map conf; + private IStormClusterState stormClusterState; + private String hostName; + private String stormHome; + + private String profileCmd; + + private SupervisorData supervisorData; + + private class ActionExitCallback implements Utils.ExitCodeCallable { + private String stormId; + private ProfileRequest profileRequest; + private String logPrefix; + + public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) { + this.stormId = stormId; + this.profileRequest = profileRequest; + this.logPrefix = logPrefix; + } + + @Override + public Object call() throws Exception { + return null; + } + + @Override + public Object call(int exitCode) { + LOG.info("{} profile-action exited for {}", logPrefix, exitCode); + try { + stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); + } catch (Exception e) { + LOG.warn("failed delete profileRequest: " + profileRequest); + } + return null; + } + } + + public RunProfilerActions(SupervisorData supervisorData) { + this.conf = supervisorData.getConf(); + this.stormClusterState = supervisorData.getStormClusterState(); + this.hostName = supervisorData.getHostName(); + this.stormHome = System.getProperty("storm.home"); + this.profileCmd = (String) (conf.get(Config.WORKER_PROFILER_COMMAND)); + this.supervisorData = supervisorData; + } + + @Override + public void run() { + Map<String, List<ProfileRequest>> stormIdToActions = supervisorData.getStormIdToProfileActions(); + try { + for (Map.Entry<String, List<ProfileRequest>> entry : stormIdToActions.entrySet()) { + String stormId = entry.getKey(); + List<ProfileRequest> requests = entry.getValue(); + if (requests != null) { + for (ProfileRequest profileRequest : requests) { + if (profileRequest.get_nodeInfo().get_node().equals(hostName)) { + boolean stop = System.currentTimeMillis() > profileRequest.get_time_stamp() ? true : false; + Long port = profileRequest.get_nodeInfo().get_port().iterator().next(); + String targetDir = ConfigUtils.workerArtifactsRoot(conf, String.valueOf(port)); + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + + String user = null; + if (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER) != null) { + user = (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)); + } + Map<String, String> env = null; + if (stormConf.get(Config.TOPOLOGY_ENVIRONMENT) != null) { + env = (Map<String, String>) stormConf.get(Config.TOPOLOGY_ENVIRONMENT); + } else { + env = new HashMap<String, String>(); + } + + String str = ConfigUtils.workerArtifactsPidPath(conf, stormId, port.intValue()); + StringBuilder stringBuilder = new StringBuilder(); + FileReader reader = null; + BufferedReader br = null; + try { + reader = new FileReader(str); + br = new BufferedReader(reader); + int c; + while ((c = br.read()) >= 0) { + stringBuilder.append(c); + } + } catch (IOException e) { + if (reader != null) + reader.close(); + if (br != null) + br.close(); + } + String workerPid = stringBuilder.toString().trim(); + ProfileAction profileAction = profileRequest.get_action(); + String logPrefix = "ProfilerAction process " + stormId + ":" + port + " PROFILER_ACTION: " + profileAction + " "; + + // Until PROFILER_STOP action is invalid, keep launching profiler start in case worker restarted + // The profiler plugin script validates if JVM is recording before starting another recording. + String command = mkCommand(profileAction, stop, workerPid, targetDir); + List<String> listCommand = new ArrayList<>(); + if (command != null) { + listCommand.addAll(Arrays.asList(command.split(" "))); + } + try { + ActionExitCallback actionExitCallback = new ActionExitCallback(stormId, profileRequest, logPrefix); + launchProfilerActionForWorker(user, targetDir, listCommand, env, actionExitCallback, logPrefix); + } catch (IOException e) { + LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); + } catch (RuntimeException e) { + LOG.error("Error in processing ProfilerAction '{}' for {}:{}, will retry later", profileAction, stormId, port); + } + } + } + } + } + } catch (Exception e) { + LOG.error("Error running profiler actions, will retry again later"); + } + } + + private void launchProfilerActionForWorker(String user, String targetDir, List<String> commands, Map<String, String> environment, + final Utils.ExitCodeCallable exitCodeCallable, String logPrefix) throws IOException { + File targetFile = new File(targetDir); + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + LOG.info("Running as user:{} command:{}", user, commands); + String containerFile = Utils.containerFilePath(targetDir); + if (Utils.checkFileExists(containerFile)) { + SupervisorUtils.rmrAsUser(conf, containerFile, containerFile); + } + String scriptFile = Utils.scriptFilePath(targetDir); + if (Utils.checkFileExists(scriptFile)) { + SupervisorUtils.rmrAsUser(conf, scriptFile, scriptFile); + } + String script = Utils.writeScript(targetDir, commands, environment); + List<String> newCommands = new ArrayList<>(); + newCommands.add("profiler"); + newCommands.add(targetDir); + newCommands.add(script); + SupervisorUtils.workerLauncher(conf, user, newCommands, environment, logPrefix, exitCodeCallable, targetFile); + } else { + Utils.launchProcess(commands, environment, logPrefix, exitCodeCallable, targetFile); + } + } + + private String mkCommand(ProfileAction action, boolean stop, String workerPid, String targetDir) { + if (action == ProfileAction.JMAP_DUMP) { + return jmapDumpCmd(workerPid, targetDir); + } else if (action == ProfileAction.JSTACK_DUMP) { + return jstackDumpCmd(workerPid, targetDir); + } else if (action == ProfileAction.JPROFILE_DUMP) { + return jprofileDump(workerPid, targetDir); + } else if (action == ProfileAction.JVM_RESTART) { + return jprofileJvmRestart(workerPid); + } else if (!stop && action == ProfileAction.JPROFILE_STOP) { + return jprofileStart(workerPid); + } else if (stop && action == ProfileAction.JPROFILE_STOP) { + return jprofileStop(workerPid, targetDir); + } + return null; + } + + private String jmapDumpCmd(String pid, String targetDir) { + return profileCmd + " " + pid + " jmap " + targetDir; + } + + private String jstackDumpCmd(String pid, String targetDir) { + return profileCmd + " " + pid + " jstack " + targetDir; + } + + private String jprofileStart(String pid) { + return profileCmd + " " + pid + " start"; + } + + private String jprofileStop(String pid, String targetDir) { + return profileCmd + " " + pid + " stop " + targetDir; + } + + private String jprofileDump(String pid, String targetDir) { + return profileCmd + " " + pid + " dump " + targetDir; + } + + private String jprofileJvmRestart(String pid) { + return profileCmd + " " + pid + " kill"; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java new file mode 100644 index 0000000..674454b --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/ShutdownWork.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.daemon.Shutdownable; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +public abstract class ShutdownWork implements Shutdownable { + + private static Logger LOG = LoggerFactory.getLogger(ShutdownWork.class); + + public void shutWorker(SupervisorData supervisorData, String workerId) throws IOException, InterruptedException { + + LOG.info("Shutting down {}:{}", supervisorData.getSupervisorId(), workerId); + Map conf = supervisorData.getConf(); + Collection<String> pids = Utils.readDirContents(ConfigUtils.workerPidsRoot(conf, workerId)); + Integer shutdownSleepSecs = (Integer) conf.get(Config.SUPERVISOR_WORKER_SHUTDOWN_SLEEP_SECS); + Boolean asUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); + String user = ConfigUtils.getWorkerUser(conf, workerId); + String threadPid = supervisorData.getWorkerThreadPidsAtom().get(workerId); + if (StringUtils.isNotBlank(threadPid)) { + ProcessSimulator.killProcess(threadPid); + } + + for (String pid : pids) { + if (asUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("15"); + String logPrefix = "kill - 15 " + pid; + SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.killProcessWithSigTerm(pid); + } + } + + if (pids.size() > 0) { + LOG.info("Sleep {} seconds for execution of cleanup threads on worker.", shutdownSleepSecs); + Time.sleepSecs(shutdownSleepSecs); + } + + for (String pid : pids) { + if (asUser) { + List<String> commands = new ArrayList<>(); + commands.add("signal"); + commands.add(pid); + commands.add("9"); + String logPrefix = "kill - 9 " + pid; + SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPrefix); + } else { + Utils.forceKillProcess(pid); + } + String path = ConfigUtils.workerPidPath(conf, workerId, pid); + if (asUser) { + SupervisorUtils.rmrAsUser(conf, workerId, path); + } else { + try { + LOG.debug("Removing path {}", path); + new File(path).delete(); + } catch (Exception e) { + // on windows, the supervisor may still holds the lock on the worker directory + // ignore + } + } + } + tryCleanupWorker(conf, supervisorData, workerId); + LOG.info("Shut down {}:{}", supervisorData.getSupervisorId(), workerId); + + } + + protected void tryCleanupWorker(Map conf, SupervisorData supervisorData, String workerId) { + try { + String workerRoot = ConfigUtils.workerRoot(conf, workerId); + if (Utils.checkFileExists(workerRoot)) { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + SupervisorUtils.rmrAsUser(conf, workerId, workerRoot); + } else { + Utils.forceDelete(ConfigUtils.workerHeartbeatsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerPidsRoot(conf, workerId)); + Utils.forceDelete(ConfigUtils.workerRoot(conf, workerId)); + } + ConfigUtils.removeWorkerUserWSE(conf, workerId); + supervisorData.getDeadWorkers().remove(workerId); + } + if (conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE) != null) { + supervisorData.getResourceIsolationManager().releaseResourcesForWorker(workerId); + } + } catch (IOException e) { + LOG.warn("{} Failed to cleanup worker {}. Will retry later", e, workerId); + } catch (RuntimeException e) { + LOG.warn("{} Failed to cleanup worker {}. Will retry later", e, workerId); + } + } + + @Override + public void shutdown() { + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java new file mode 100644 index 0000000..da54b88 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.LocalState; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; + +public class StandaloneSupervisor implements ISupervisor { + + private String supervisorId; + + private Map conf; + + @Override + public void prepare(Map stormConf, String schedulerLocalDir) { + try { + LocalState localState = new LocalState(schedulerLocalDir); + String supervisorId = localState.getSupervisorId(); + if (supervisorId == null) { + supervisorId = UUID.randomUUID().toString(); + localState.setSupervisorId(supervisorId); + } + this.conf = stormConf; + this.supervisorId = supervisorId; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public String getSupervisorId() { + return supervisorId; + } + + @Override + public String getAssignmentId() { + return supervisorId; + } + + @Override + // @return is vector which need be converted to be int + public Object getMetadata() { + Object ports = conf.get(Config.SUPERVISOR_SLOTS_PORTS); + return ports; + } + + @Override + public boolean confirmAssigned(int port) { + return true; + } + + @Override + public void killedWorker(int port) { + + } + + @Override + public void assigned(Collection<Integer> ports) { + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java new file mode 100644 index 0000000..1913c91 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/State.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +public enum State { + valid, disallowed, notStarted, timedOut; +} http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java new file mode 100644 index 0000000..cca3fa2 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/StateHeartbeat.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.builder.ToStringBuilder; +import org.apache.commons.lang.builder.ToStringStyle; +import org.apache.storm.generated.LSWorkerHeartbeat; + +public class StateHeartbeat { + private State state; + private LSWorkerHeartbeat hb; + + public StateHeartbeat(State state, LSWorkerHeartbeat hb) { + this.state = state; + this.hb = hb; + } + + public State getState() { + return this.state; + } + + public LSWorkerHeartbeat getHeartbeat() { + return this.hb; + } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java new file mode 100644 index 0000000..115c7c6 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorDaemon.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import java.util.Map; + +public interface SupervisorDaemon { + String getId(); + + Map getConf(); + + void shutdownAllWorkers(); +} http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java new file mode 100644 index 0000000..9eec253 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorData.java @@ -0,0 +1,340 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +public class SupervisorData { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorData.class); + + private Map conf; + private IContext sharedContext; + private volatile boolean active; + private ISupervisor iSupervisor; + private Utils.UptimeComputer upTime; + private String stormVersion; + + private ConcurrentHashMap<String, String> workerThreadPidsAtom; // for local mode + + private IStormClusterState stormClusterState; + + private LocalState localState; + + private String supervisorId; + + private String assignmentId; + + private String hostName; + + // used for reporting used ports when heartbeating + private ConcurrentHashMap<Long, LocalAssignment> currAssignment; + + private StormTimer heartbeatTimer; + + private StormTimer eventTimer; + + private StormTimer blobUpdateTimer; + + private Localizer localizer; + + private ConcurrentHashMap<String, Map<String, Object>> assignmentVersions; + + private AtomicInteger syncRetry; + + private final Object downloadLock = new Object(); + + private ConcurrentHashMap<String, List<ProfileRequest>> stormIdToProfileActions; + + private CgroupManager resourceIsolationManager; + + private ConcurrentHashSet<String> deadWorkers; + + public SupervisorData(Map conf, IContext sharedContext, ISupervisor iSupervisor) { + this.conf = conf; + this.sharedContext = sharedContext; + this.iSupervisor = iSupervisor; + this.active = true; + this.upTime = Utils.makeUptimeComputer(); + this.stormVersion = VersionInfo.getVersion(); + this.workerThreadPidsAtom = new ConcurrentHashMap<String, String>(); + this.deadWorkers = new ConcurrentHashSet(); + + List<ACL> acls = null; + if (Utils.isZkAuthenticationConfiguredStormServer(conf)) { + acls = new ArrayList<>(); + acls.add(ZooDefs.Ids.CREATOR_ALL_ACL.get(0)); + acls.add(new ACL((ZooDefs.Perms.READ ^ ZooDefs.Perms.CREATE), ZooDefs.Ids.ANYONE_ID_UNSAFE)); + } + try { + this.stormClusterState = ClusterUtils.mkStormClusterState(conf, acls, new ClusterStateContext(DaemonType.SUPERVISOR)); + } catch (Exception e) { + LOG.error("supervisor can't create stormClusterState"); + throw Utils.wrapInRuntime(e); + } + + try { + this.localState = ConfigUtils.supervisorState(conf); + this.localizer = Utils.createLocalizer(conf, ConfigUtils.supervisorLocalDir(conf)); + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } + this.supervisorId = iSupervisor.getSupervisorId(); + this.assignmentId = iSupervisor.getAssignmentId(); + + try { + this.hostName = Utils.hostname(conf); + } catch (UnknownHostException e) { + throw Utils.wrapInRuntime(e); + } + + this.currAssignment = new ConcurrentHashMap<>(); + + this.heartbeatTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); + + this.eventTimer = new StormTimer(null, new DefaultUncaughtExceptionHandler()); + + this.blobUpdateTimer = new StormTimer("blob-update-timer", new DefaultUncaughtExceptionHandler()); + + this.assignmentVersions = new ConcurrentHashMap<>(); + this.syncRetry = new AtomicInteger(0); + this.stormIdToProfileActions = new ConcurrentHashMap<>(); + if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { + try { + this.resourceIsolationManager = (CgroupManager) Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); + this.resourceIsolationManager.prepare(conf); + LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); + } catch (IOException e) { + throw Utils.wrapInRuntime(e); + } + } else { + this.resourceIsolationManager = null; + } + } + + public ConcurrentHashMap<String, List<ProfileRequest>> getStormIdToProfileActions() { + return stormIdToProfileActions; + } + + public void setStormIdToProfileActions(Map<String, List<ProfileRequest>> stormIdToProfileActions) { + this.stormIdToProfileActions.clear(); + this.stormIdToProfileActions.putAll(stormIdToProfileActions); + } + + public Map getConf() { + return conf; + } + + public void setConf(Map conf) { + this.conf = conf; + } + + public IContext getSharedContext() { + return sharedContext; + } + + public void setSharedContext(IContext sharedContext) { + this.sharedContext = sharedContext; + } + + public boolean isActive() { + return active; + } + + public void setActive(boolean active) { + this.active = active; + } + + public ISupervisor getiSupervisor() { + return iSupervisor; + } + + public void setiSupervisor(ISupervisor iSupervisor) { + this.iSupervisor = iSupervisor; + } + + public Utils.UptimeComputer getUpTime() { + return upTime; + } + + public void setUpTime(Utils.UptimeComputer upTime) { + this.upTime = upTime; + } + + public String getStormVersion() { + return stormVersion; + } + + public void setStormVersion(String stormVersion) { + this.stormVersion = stormVersion; + } + + public ConcurrentHashMap<String, String> getWorkerThreadPidsAtom() { + return workerThreadPidsAtom; + } + + public void setWorkerThreadPidsAtom(ConcurrentHashMap<String, String> workerThreadPidsAtom) { + this.workerThreadPidsAtom = workerThreadPidsAtom; + } + + public IStormClusterState getStormClusterState() { + return stormClusterState; + } + + public void setStormClusterState(IStormClusterState stormClusterState) { + this.stormClusterState = stormClusterState; + } + + public LocalState getLocalState() { + return localState; + } + + public void setLocalState(LocalState localState) { + this.localState = localState; + } + + public String getSupervisorId() { + return supervisorId; + } + + public void setSupervisorId(String supervisorId) { + this.supervisorId = supervisorId; + } + + public String getAssignmentId() { + return assignmentId; + } + + public void setAssignmentId(String assignmentId) { + this.assignmentId = assignmentId; + } + + public String getHostName() { + return hostName; + } + + public void setHostName(String hostName) { + this.hostName = hostName; + } + + public ConcurrentHashMap<Long, LocalAssignment> getCurrAssignment() { + return currAssignment; + } + + public void setCurrAssignment(Map<Long, LocalAssignment> currAssignment) { + this.currAssignment.clear(); + this.currAssignment.putAll(currAssignment); + } + + public StormTimer getHeartbeatTimer() { + return heartbeatTimer; + } + + public void setHeartbeatTimer(StormTimer heartbeatTimer) { + this.heartbeatTimer = heartbeatTimer; + } + + public StormTimer getEventTimer() { + return eventTimer; + } + + public void setEventTimer(StormTimer eventTimer) { + this.eventTimer = eventTimer; + } + + public StormTimer getBlobUpdateTimer() { + return blobUpdateTimer; + } + + public void setBlobUpdateTimer(StormTimer blobUpdateTimer) { + this.blobUpdateTimer = blobUpdateTimer; + } + + public Localizer getLocalizer() { + return localizer; + } + + public void setLocalizer(Localizer localizer) { + this.localizer = localizer; + } + + public AtomicInteger getSyncRetry() { + return syncRetry; + } + + public void setSyncRetry(AtomicInteger syncRetry) { + this.syncRetry = syncRetry; + } + + public ConcurrentHashMap<String, Map<String, Object>> getAssignmentVersions() { + return assignmentVersions; + } + + public void setAssignmentVersions(Map<String, Map<String, Object>> assignmentVersions) { + this.assignmentVersions.clear(); + this.assignmentVersions.putAll(assignmentVersions); + } + + public CgroupManager getResourceIsolationManager() { + return resourceIsolationManager; + } + + public void setResourceIsolationManager(CgroupManager resourceIsolationManager) { + this.resourceIsolationManager = resourceIsolationManager; + } + + public Object getDownloadLock() { + return downloadLock; + } + + public ConcurrentHashSet getDeadWorkers() { + return deadWorkers; + } + + public void setDeadWorkers(ConcurrentHashSet deadWorkers) { + this.deadWorkers = deadWorkers; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java new file mode 100644 index 0000000..399dcd2 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorHeartbeat.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.generated.SupervisorInfo; +import org.apache.storm.utils.Time; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SupervisorHeartbeat implements Runnable { + + private IStormClusterState stormClusterState; + private String supervisorId; + private Map conf; + private SupervisorInfo supervisorInfo; + + private SupervisorData supervisorData; + + public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) { + this.stormClusterState = supervisorData.getStormClusterState(); + this.supervisorId = supervisorData.getSupervisorId(); + this.supervisorData = supervisorData; + this.conf = conf; + } + + private SupervisorInfo update(Map conf, SupervisorData supervisorData) { + supervisorInfo = new SupervisorInfo(); + supervisorInfo.set_time_secs(Time.currentTimeSecs()); + supervisorInfo.set_hostname(supervisorData.getHostName()); + supervisorInfo.set_assignment_id(supervisorData.getAssignmentId()); + + List<Long> usedPorts = new ArrayList<>(); + usedPorts.addAll(supervisorData.getCurrAssignment().keySet()); + supervisorInfo.set_used_ports(usedPorts); + List<Long> portList = new ArrayList<>(); + Object metas = supervisorData.getiSupervisor().getMetadata(); + if (metas != null) { + for (Integer port : (List<Integer>) metas) { + portList.add(port.longValue()); + } + } + supervisorInfo.set_meta(portList); + supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META)); + supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime()); + supervisorInfo.set_version(supervisorData.getStormVersion()); + supervisorInfo.set_resources_map(mkSupervisorCapacities(conf)); + return supervisorInfo; + } + + private Map<String, Double> mkSupervisorCapacities(Map conf) { + Map<String, Double> ret = new HashMap<String, Double>(); + Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)); + ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem); + Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY)); + ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu); + return ret; + } + + @Override + public void run() { + SupervisorInfo supervisorInfo = update(conf, supervisorData); + stormClusterState.supervisorHeartbeat(supervisorId, supervisorInfo); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java new file mode 100644 index 0000000..acc2cb8 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorManger.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.event.EventManager; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Map; + +public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, DaemonCommon, Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorManger.class); + + private final EventManager eventManager; + + private final EventManager processesEventManager; + + private SupervisorData supervisorData; + + public SupervisorManger(SupervisorData supervisorData, EventManager eventManager, EventManager processesEventManager) { + this.eventManager = eventManager; + this.supervisorData = supervisorData; + this.processesEventManager = processesEventManager; + } + + @Override + public void shutdown() { + LOG.info("Shutting down supervisor{}", supervisorData.getSupervisorId()); + supervisorData.setActive(false); + try { + supervisorData.getHeartbeatTimer().close(); + supervisorData.getEventTimer().close(); + supervisorData.getBlobUpdateTimer().close(); + eventManager.close(); + processesEventManager.close(); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + supervisorData.getStormClusterState().disconnect(); + } + + @Override + public void shutdownAllWorkers() { + + Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(supervisorData.getConf()); + try { + for (String workerId : workerIds) { + shutWorker(supervisorData, workerId); + } + } catch (Exception e) { + LOG.error("shutWorker failed"); + throw Utils.wrapInRuntime(e); + } + } + + @Override + public Map getConf() { + return supervisorData.getConf(); + } + + @Override + public String getId() { + return supervisorData.getSupervisorId(); + } + + @Override + public boolean isWaiting() { + if (!supervisorData.isActive()) { + return true; + } + + if (supervisorData.getHeartbeatTimer().isTimerWaiting() && supervisorData.getEventTimer().isTimerWaiting() && eventManager.waiting() + && processesEventManager.waiting()) { + return true; + } + return false; + } + + public void run() { + shutdown(); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java new file mode 100644 index 0000000..f1dfb8a --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorServer.java @@ -0,0 +1,212 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.StormTimer; +import org.apache.storm.command.HealthCheck; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.daemon.metrics.reporters.PreparableReporter; +import org.apache.storm.event.EventManagerImp; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.messaging.IContext; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.InterruptedIOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class SupervisorServer extends ShutdownWork { + private static Logger LOG = LoggerFactory.getLogger(SupervisorServer.class); + + /** + * in local state, supervisor stores who its current assignments are another thread launches events to restart any dead processes if necessary + * + * @param conf + * @param sharedContext + * @param iSupervisor + * @return + * @throws Exception + */ + private SupervisorManger mkSupervisor(final Map conf, IContext sharedContext, ISupervisor iSupervisor) throws Exception { + SupervisorManger supervisorManger = null; + try { + LOG.info("Starting Supervisor with conf {}", conf); + iSupervisor.prepare(conf, ConfigUtils.supervisorIsupervisorDir(conf)); + String path = ConfigUtils.supervisorTmpDir(conf); + FileUtils.cleanDirectory(new File(path)); + + final SupervisorData supervisorData = new SupervisorData(conf, sharedContext, iSupervisor); + Localizer localizer = supervisorData.getLocalizer(); + + SupervisorHeartbeat hb = new SupervisorHeartbeat(conf, supervisorData); + hb.run(); + // should synchronize supervisor so it doesn't launch anything after being down (optimization) + Integer heartbeatFrequency = (Integer) conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS); + supervisorData.getHeartbeatTimer().scheduleRecurring(0, heartbeatFrequency, hb); + + Set<String> downdedStormId = SupervisorUtils.readDownLoadedStormIds(conf); + for (String stormId : downdedStormId) { + SupervisorUtils.addBlobReferences(localizer, stormId, conf); + } + // do this after adding the references so we don't try to clean things being used + localizer.startCleaner(); + + EventManagerImp syncSupEventManager = new EventManagerImp(false); + EventManagerImp syncProcessManager = new EventManagerImp(false); + SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorData); + SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(supervisorData, syncProcessEvent, syncSupEventManager, syncProcessManager); + UpdateBlobs updateBlobsThread = new UpdateBlobs(supervisorData); + RunProfilerActions runProfilerActionThread = new RunProfilerActions(supervisorData); + + if ((Boolean) conf.get(Config.SUPERVISOR_ENABLE)) { + StormTimer eventTimer = supervisorData.getEventTimer(); + // This isn't strictly necessary, but it doesn't hurt and ensures that the machine stays up + // to date even if callbacks don't all work exactly right + eventTimer.scheduleRecurring(0, 10, new EventManagerPushCallback(syncSupervisorEvent, syncSupEventManager)); + + eventTimer.scheduleRecurring(0, (Integer) conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS), + new EventManagerPushCallback(syncProcessEvent, syncProcessManager)); + + // Blob update thread. Starts with 30 seconds delay, every 30 seconds + supervisorData.getBlobUpdateTimer().scheduleRecurring(30, 30, new EventManagerPushCallback(updateBlobsThread, syncSupEventManager)); + + // supervisor health check + eventTimer.scheduleRecurring(300, 300, new Runnable() { + @Override + public void run() { + int healthCode = HealthCheck.healthCheck(conf); + Collection<String> workerIds = SupervisorUtils.supervisorWorkerIds(conf); + if (healthCode != 0) { + for (String workerId : workerIds) { + try { + shutWorker(supervisorData, workerId); + } catch (Exception e) { + throw Utils.wrapInRuntime(e); + } + } + } + } + }); + + // Launch a thread that Runs profiler commands . Starts with 30 seconds delay, every 30 seconds + eventTimer.scheduleRecurring(30, 30, new EventManagerPushCallback(runProfilerActionThread, syncSupEventManager)); + } + supervisorManger = new SupervisorManger(supervisorData, syncSupEventManager, syncProcessManager); + } catch (Throwable t) { + if (Utils.exceptionCauseIsInstanceOf(InterruptedIOException.class, t)) { + throw t; + } else if (Utils.exceptionCauseIsInstanceOf(InterruptedException.class, t)) { + throw t; + } else { + LOG.error("Error on initialization of server supervisor"); + Utils.exitProcess(13, "Error on initialization"); + } + } + return supervisorManger; + } + + /** + * start local supervisor + */ + public void localLaunch() { + LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion()); + SupervisorManger supervisorManager; + try { + Map<Object, Object> conf = Utils.readStormConfig(); + if (!ConfigUtils.isLocalMode(conf)) { + throw new IllegalArgumentException("Cannot start server in distribute mode!"); + } + ISupervisor iSupervisor = new StandaloneSupervisor(); + supervisorManager = mkSupervisor(conf, null, iSupervisor); + if (supervisorManager != null) + Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager); + } catch (Exception e) { + LOG.error("Failed to start supervisor\n", e); + System.exit(1); + } + } + + /** + * start distribute supervisor + */ + private void distributeLaunch() { + LOG.info("Starting supervisor for storm version '{}'.", VersionInfo.getVersion()); + SupervisorManger supervisorManager; + try { + Map<Object, Object> conf = Utils.readStormConfig(); + if (ConfigUtils.isLocalMode(conf)) { + throw new IllegalArgumentException("Cannot start server in local mode!"); + } + ISupervisor iSupervisor = new StandaloneSupervisor(); + supervisorManager = mkSupervisor(conf, null, iSupervisor); + if (supervisorManager != null) + Utils.addShutdownHookWithForceKillIn1Sec(supervisorManager); + registerWorkerNumGauge("drpc:num-execute-http-requests", conf); + startMetricsReporters(conf); + } catch (Exception e) { + LOG.error("Failed to start supervisor\n", e); + System.exit(1); + } + } + + // To be removed + private void registerWorkerNumGauge(String name, final Map conf) { + MetricRegistry metricRegistry = new MetricRegistry(); + metricRegistry.remove(name); + metricRegistry.register(name, new Gauge<Integer>() { + @Override + public Integer getValue() { + Collection<String> pids = Utils.readDirContents(ConfigUtils.workerRoot(conf)); + return pids.size(); + } + }); + } + + // To be removed + private void startMetricsReporters(Map conf) { + List<PreparableReporter> preparableReporters = MetricsUtils.getPreparableReporters(conf); + for (PreparableReporter reporter : preparableReporters) { + reporter.prepare(new MetricRegistry(), conf); + reporter.start(); + } + LOG.info("Started statistics report plugin..."); + } + + /** + * supervisor daemon enter entrance + * + * @param args + */ + public static void main(String[] args) { + Utils.setupDefaultUncaughtExceptionHandler(); + SupervisorServer instance = new SupervisorServer(); + instance.distributeLaunch(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/08934e29/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java new file mode 100644 index 0000000..ffdb839 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.utils.PathUtils; +import org.apache.storm.Config; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + + private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + + public static Process workerLauncher(Map conf, String user, List<String> args, Map<String, String> environment, final String logPreFix, + final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { + if (StringUtils.isBlank(user)) { + throw new IllegalArgumentException("User cannot be blank when calling workerLauncher."); + } + String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); + String stormHome = System.getProperty("storm.home"); + String wl; + if (StringUtils.isNotBlank(wlinitial)) { + wl = wlinitial; + } else { + wl = stormHome + "/bin/worker-launcher"; + } + List<String> commands = new ArrayList<>(); + commands.add(wl); + commands.add(user); + commands.addAll(args); + return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); + } + + public static int workerLauncherAndWait(Map conf, String user, List<String> args, final Map<String, String> environment, final String logPreFix) + throws IOException { + int ret = 0; + Process process = workerLauncher(conf, user, args, environment, logPreFix, null, null); + if (StringUtils.isNotBlank(logPreFix)) + Utils.readAndLogStream(logPreFix, process.getInputStream()); + try { + process.waitFor(); + } catch (InterruptedException e) { + LOG.info("{} interrupted.", logPreFix); + } + ret = process.exitValue(); + return ret; + } + + public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { + if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { + String logPrefix = "setup conf for " + dir; + List<String> commands = new ArrayList<>(); + commands.add("code-dir"); + commands.add(dir); + workerLauncherAndWait(conf, (String) (stormConf.get(Config.TOPOLOGY_SUBMITTER_USER)), commands, null, logPrefix); + } + } + + public static void rmrAsUser(Map conf, String id, String path) throws IOException { + String user = Utils.getFileOwner(path); + String logPreFix = "rmr " + id; + List<String> commands = new ArrayList<>(); + commands.add("rmr"); + commands.add(path); + SupervisorUtils.workerLauncherAndWait(conf, user, commands, null, logPreFix); + if (Utils.checkFileExists(path)) { + throw new RuntimeException(path + " was not deleted."); + } + } + + /** + * Given the blob information returns the value of the uncompress field, handling it either being a string or a boolean value, or if it's not specified then + * returns false + * + * @param blobInfo + * @return + */ + public static Boolean isShouldUncompressBlob(Map<String, Object> blobInfo) { + return new Boolean((String) blobInfo.get("uncompress")); + } + + /** + * Remove a reference to a blob when its no longer needed + * + * @param blobstoreMap + * @return + */ + public static List<LocalResource> blobstoreMapToLocalresources(Map<String, Map<String, Object>> blobstoreMap) { + List<LocalResource> localResourceList = new ArrayList<>(); + if (blobstoreMap != null) { + for (Map.Entry<String, Map<String, Object>> map : blobstoreMap.entrySet()) { + LocalResource localResource = new LocalResource(map.getKey(), isShouldUncompressBlob(map.getValue())); + localResourceList.add(localResource); + } + } + return localResourceList; + } + + /** + * For each of the downloaded topologies, adds references to the blobs that the topologies are using. This is used to reconstruct the cache on restart. + * + * @param localizer + * @param stormId + * @param conf + */ + public static void addBlobReferences(Localizer localizer, String stormId, Map conf) throws IOException { + Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + Map<String, Map<String, Object>> blobstoreMap = (Map<String, Map<String, Object>>) stormConf.get(Config.TOPOLOGY_BLOBSTORE_MAP); + String user = (String) stormConf.get(Config.TOPOLOGY_SUBMITTER_USER); + String topoName = (String) stormConf.get(Config.TOPOLOGY_NAME); + List<LocalResource> localresources = SupervisorUtils.blobstoreMapToLocalresources(blobstoreMap); + if (blobstoreMap != null) { + localizer.addReferences(localresources, user, topoName); + } + } + + public static Set<String> readDownLoadedStormIds(Map conf) throws IOException { + Set<String> stormIds = new HashSet<>(); + String path = ConfigUtils.supervisorStormDistRoot(conf); + Collection<String> rets = Utils.readDirContents(path); + for (String ret : rets) { + stormIds.add(URLDecoder.decode(ret)); + } + return stormIds; + } + + public static Collection<String> supervisorWorkerIds(Map conf) { + String workerRoot = ConfigUtils.workerRoot(conf); + return Utils.readDirContents(workerRoot); + } + + public static boolean checkTopoFilesExist(Map conf, String stormId) throws IOException { + String stormroot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + String stormjarpath = ConfigUtils.supervisorStormJarPath(stormroot); + String stormcodepath = ConfigUtils.supervisorStormCodePath(stormroot); + String stormconfpath = ConfigUtils.supervisorStormConfPath(stormroot); + if (!Utils.checkFileExists(stormroot)) + return false; + if (!Utils.checkFileExists(stormcodepath)) + return false; + if (!Utils.checkFileExists(stormconfpath)) + return false; + if (!ConfigUtils.isLocalMode(conf) && !Utils.checkFileExists(stormjarpath)) + return false; + return true; + } + +}
