http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java index 8d2ba24..78cfe73 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/CgroupManager.java @@ -36,10 +36,12 @@ import com.alibaba.jstorm.container.cgroup.core.CgroupCore; import com.alibaba.jstorm.container.cgroup.core.CpuCore; import com.alibaba.jstorm.utils.JStormUtils; +/** + * @author Johnfang ([email protected]) + */ public class CgroupManager { - public static final Logger LOG = LoggerFactory - .getLogger(CgroupManager.class); + public static final Logger LOG = LoggerFactory.getLogger(CgroupManager.class); public static final String JSTORM_HIERARCHY_NAME = "jstorm_cpu"; @@ -61,20 +63,16 @@ public class CgroupManager { // "/cgroup/cpu" rootDir = ConfigExtension.getCgroupRootDir(conf); if (rootDir == null) - throw new RuntimeException( - "Check configuration file. The supervisor.cgroup.rootdir is missing."); + throw new RuntimeException("Check configuration file. The supervisor.cgroup.rootdir is missing."); File file = new File(JSTORM_CPU_HIERARCHY_DIR + "/" + rootDir); if (!file.exists()) { - LOG.error(JSTORM_CPU_HIERARCHY_DIR + "/" + rootDir - + " is not existing."); - throw new RuntimeException( - "Check if cgconfig service starts or /etc/cgconfig.conf is consistent with configuration file."); + LOG.error(JSTORM_CPU_HIERARCHY_DIR + "/" + rootDir + " is not existing."); + throw new RuntimeException("Check if cgconfig service starts or /etc/cgconfig.conf is consistent with configuration file."); } center = CgroupCenter.getInstance(); if (center == null) - throw new RuntimeException( - "Cgroup error, please check /proc/cgroups"); + throw new RuntimeException("Cgroup error, please check /proc/cgroups"); this.prepareSubSystem(); } @@ -90,13 +88,10 @@ public class CgroupManager { return value; } - private void setCpuUsageUpperLimit(CpuCore cpuCore, int cpuCoreUpperLimit) - throws IOException { + private void setCpuUsageUpperLimit(CpuCore cpuCore, int cpuCoreUpperLimit) throws IOException { /* - * User cfs_period & cfs_quota to control the upper limit use of cpu - * core e.g. If making a process to fully use two cpu cores, set - * cfs_period_us to 100000 and set cfs_quota_us to 200000 The highest - * value of "cpu core upper limit" is 10 + * User cfs_period & cfs_quota to control the upper limit use of cpu core e.g. If making a process to fully use two cpu cores, set cfs_period_us to + * 100000 and set cfs_quota_us to 200000 The highest value of "cpu core upper limit" is 10 */ cpuCoreUpperLimit = validateCpuUpperLimitValue(cpuCoreUpperLimit); @@ -109,16 +104,13 @@ public class CgroupManager { } } - public String startNewWorker(Map conf, int cpuNum, String workerId) - throws SecurityException, IOException { - CgroupCommon workerGroup = - new CgroupCommon(workerId, h, this.rootCgroup); + public String startNewWorker(Map conf, int cpuNum, String workerId) throws SecurityException, IOException { + CgroupCommon workerGroup = new CgroupCommon(workerId, h, this.rootCgroup); this.center.create(workerGroup); CgroupCore cpu = workerGroup.getCores().get(SubSystemType.cpu); CpuCore cpuCore = (CpuCore) cpu; cpuCore.setCpuShares(cpuNum * ONE_CPU_SLOT); - setCpuUsageUpperLimit(cpuCore, - ConfigExtension.getWorkerCpuCoreUpperLimit(conf)); + setCpuUsageUpperLimit(cpuCore, ConfigExtension.getWorkerCpuCoreUpperLimit(conf)); StringBuilder sb = new StringBuilder(); sb.append("cgexec -g cpu:").append(workerGroup.getName()).append(" "); @@ -126,8 +118,7 @@ public class CgroupManager { } public void shutDownWorker(String workerId, boolean isKilled) { - CgroupCommon workerGroup = - new CgroupCommon(workerId, h, this.rootCgroup); + CgroupCommon workerGroup = new CgroupCommon(workerId, h, this.rootCgroup); try { if (isKilled == false) { for (Integer pid : workerGroup.getTasks()) { @@ -151,9 +142,7 @@ public class CgroupManager { if (h == null) { Set<SubSystemType> types = new HashSet<SubSystemType>(); types.add(SubSystemType.cpu); - h = - new Hierarchy(JSTORM_HIERARCHY_NAME, types, - JSTORM_CPU_HIERARCHY_DIR); + h = new Hierarchy(JSTORM_HIERARCHY_NAME, types, JSTORM_CPU_HIERARCHY_DIR); } rootCgroup = new CgroupCommon(rootDir, h, h.getRootCgroups()); }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java index e55aabe..d003a14 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Heartbeat.java @@ -36,6 +36,7 @@ import com.alibaba.jstorm.utils.TimeUtils; /** * supervisor Heartbeat, just write SupervisorInfo to ZK + * @author Johnfang ([email protected]) */ class Heartbeat extends RunnableCallback { @@ -67,8 +68,7 @@ class Heartbeat extends RunnableCallback { * @param myHostName */ @SuppressWarnings({ "rawtypes", "unchecked" }) - public Heartbeat(Map conf, StormClusterState stormClusterState, - String supervisorId) { + public Heartbeat(Map conf, StormClusterState stormClusterState, String supervisorId) { String myHostName = JStormServerUtils.getHostName(conf); @@ -77,15 +77,12 @@ class Heartbeat extends RunnableCallback { this.conf = conf; this.myHostName = myHostName; this.startTime = TimeUtils.current_time_secs(); - this.frequence = - JStormUtils.parseInt(conf - .get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)); + this.frequence = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_HEARTBEAT_FREQUENCY_SECS)); this.hbUpdateTrigger = new AtomicBoolean(true); initSupervisorInfo(conf); - LOG.info("Successfully init supervisor heartbeat thread, " - + supervisorInfo); + LOG.info("Successfully init supervisor heartbeat thread, " + supervisorInfo); } private void initSupervisorInfo(Map conf) { @@ -96,32 +93,28 @@ class Heartbeat extends RunnableCallback { boolean isLocaliP = false; isLocaliP = myHostName.equals("127.0.0.1"); - if(isLocaliP){ + if (isLocaliP) { throw new Exception("the hostname which supervisor get is localhost"); } - }catch(Exception e1){ + } catch (Exception e1) { LOG.error("get supervisor host error!", e1); throw new RuntimeException(e1); } Set<Integer> ports = JStormUtils.listToSet(portList); - supervisorInfo = - new SupervisorInfo(myHostName, supervisorId, ports); + supervisorInfo = new SupervisorInfo(myHostName, supervisorId, ports); } else { - Set<Integer> ports = JStormUtils.listToSet(portList.subList(0, 1)); - supervisorInfo = - new SupervisorInfo(myHostName, supervisorId, ports); + Set<Integer> ports = JStormUtils.listToSet(portList); + supervisorInfo = new SupervisorInfo(myHostName, supervisorId, ports); } } @SuppressWarnings("unchecked") public void update() { supervisorInfo.setTimeSecs(TimeUtils.current_time_secs()); - supervisorInfo - .setUptimeSecs((int) (TimeUtils.current_time_secs() - startTime)); + supervisorInfo.setUptimeSecs((int) (TimeUtils.current_time_secs() - startTime)); try { - stormClusterState - .supervisor_heartbeat(supervisorId, supervisorInfo); + stormClusterState.supervisor_heartbeat(supervisorId, supervisorInfo); } catch (Exception e) { LOG.error("Failed to update SupervisorInfo to ZK"); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java index fad1346..4ece066 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Httpserver.java @@ -62,6 +62,9 @@ import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import com.sun.net.httpserver.HttpServer; +/** + * @author Johnfang ([email protected]) + */ public class Httpserver implements Shutdownable { private static Logger LOG = LoggerFactory.getLogger(Httpserver.class); @@ -119,13 +122,11 @@ public class Httpserver implements Shutdownable { } - public void handlFailure(HttpExchange t, String errorMsg) - throws IOException { + public void handlFailure(HttpExchange t, String errorMsg) throws IOException { LOG.error(errorMsg); byte[] data = errorMsg.getBytes(); - t.sendResponseHeaders(HttpURLConnection.HTTP_BAD_REQUEST, - data.length); + t.sendResponseHeaders(HttpURLConnection.HTTP_BAD_REQUEST, data.length); OutputStream os = t.getResponseBody(); os.write(data); os.close(); @@ -136,8 +137,7 @@ public class Httpserver implements Shutdownable { Map<String, String> paramMap = parseRawQuery(uri.getRawQuery()); LOG.info("Receive command " + paramMap); - String cmd = - paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD); + String cmd = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD); if (StringUtils.isBlank(cmd) == true) { handlFailure(t, "Bad Request, Not set command type"); return; @@ -146,16 +146,13 @@ public class Httpserver implements Shutdownable { if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW.equals(cmd)) { handleShowLog(t, paramMap); return; - } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_LIST - .equals(cmd)) { + } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_LIST.equals(cmd)) { handleListDir(t, paramMap); return; - } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK - .equals(cmd)) { + } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_JSTACK.equals(cmd)) { handleJstack(t, paramMap); return; - } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF - .equals(cmd)) { + } else if (HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_CMD_SHOW_CONF.equals(cmd)) { handleShowConf(t, paramMap); return; } @@ -178,8 +175,7 @@ public class Httpserver implements Shutdownable { if (isChild == false) { LOG.error("Access one disallowed path: " + canonicalPath); - throw new IOException( - "Destination file/path is not accessible."); + throw new IOException("Destination file/path is not accessible."); } } @@ -196,34 +192,27 @@ public class Httpserver implements Shutdownable { return paramMap; } - private void handleShowLog(HttpExchange t, Map<String, String> paramMap) - throws IOException { + private void handleShowLog(HttpExchange t, Map<String, String> paramMap) throws IOException { Pair<Long, byte[]> logPair = queryLog(t, paramMap); if (logPair == null) { return; } - String size = - String.format( - HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_SIZE_FORMAT, - logPair.getFirst()); + String size = String.format(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_SIZE_FORMAT, logPair.getFirst()); byte[] sizeByts = size.getBytes(); byte[] logData = logPair.getSecond(); - t.sendResponseHeaders(HttpURLConnection.HTTP_OK, sizeByts.length - + logData.length); + t.sendResponseHeaders(HttpURLConnection.HTTP_OK, sizeByts.length + logData.length); OutputStream os = t.getResponseBody(); os.write(sizeByts); os.write(logData); os.close(); } - private Pair<Long, byte[]> queryLog(HttpExchange t, - Map<String, String> paramMap) throws IOException { + private Pair<Long, byte[]> queryLog(HttpExchange t, Map<String, String> paramMap) throws IOException { - String fileParam = - paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_LOGFILE); + String fileParam = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_LOGFILE); if (StringUtils.isBlank(fileParam)) { handlFailure(t, "Bad Request, Params Error, no log file name."); return null; @@ -242,8 +231,7 @@ public class Httpserver implements Shutdownable { long position = fileSize - pageSize; try { - String posStr = - paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_POS); + String posStr = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_POS); if (StringUtils.isBlank(posStr) == false) { long pos = Long.valueOf(posStr); @@ -258,15 +246,12 @@ public class Httpserver implements Shutdownable { long size = Math.min(fileSize - position, pageSize); - LOG.info("logview " + logFile + ", position=" + position - + ", size=" + size); + LOG.info("logview " + logFile + ", position=" + position + ", size=" + size); fout = fc.map(FileChannel.MapMode.READ_ONLY, position, size); ret = new byte[(int) size]; fout.get(ret); - String str = - new String(ret, - ConfigExtension.getLogViewEncoding(conf)); + String str = new String(ret, ConfigExtension.getLogViewEncoding(conf)); return new Pair<Long, byte[]>(fileSize, str.getBytes()); } catch (FileNotFoundException e) { @@ -288,8 +273,7 @@ public class Httpserver implements Shutdownable { } byte[] getJSonFiles(String dir) throws Exception { - Map<String, FileAttribute> fileMap = - new HashMap<String, FileAttribute>(); + Map<String, FileAttribute> fileMap = new HashMap<String, FileAttribute>(); String path = logDir; if (dir != null) { @@ -332,13 +316,11 @@ public class Httpserver implements Shutdownable { return fileJsonStr.getBytes(); } - void handleListDir(HttpExchange t, Map<String, String> paramMap) - throws IOException { + void handleListDir(HttpExchange t, Map<String, String> paramMap) throws IOException { byte[] filesJson = "Failed to get file list".getBytes(); try { - String dir = - paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_DIR); + String dir = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_DIR); filesJson = getJSonFiles(dir); } catch (Exception e) { LOG.error("Failed to list files", e); @@ -358,15 +340,12 @@ public class Httpserver implements Shutdownable { try { LOG.info("Begin to execute " + cmd); - Process process = - JStormUtils.launch_process(cmd, - new HashMap<String, String>(), false); + Process process = JStormUtils.launch_process(cmd, new HashMap<String, String>(), false); // Process process = Runtime.getRuntime().exec(sb.toString()); InputStream stdin = process.getInputStream(); - BufferedReader reader = - new BufferedReader(new InputStreamReader(stdin)); + BufferedReader reader = new BufferedReader(new InputStreamReader(stdin)); JStormUtils.sleepMs(1000); @@ -398,10 +377,8 @@ public class Httpserver implements Shutdownable { } } - void handleJstack(HttpExchange t, Map<String, String> paramMap) - throws IOException { - String workerPort = - paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT); + void handleJstack(HttpExchange t, Map<String, String> paramMap) throws IOException { + String workerPort = paramMap.get(HttpserverUtils.HTTPSERVER_LOGVIEW_PARAM_WORKER_PORT); if (workerPort == null) { handlFailure(t, "Not set worker's port"); return; @@ -425,8 +402,7 @@ public class Httpserver implements Shutdownable { os.close(); } - void handleShowConf(HttpExchange t, Map<String, String> paramMap) - throws IOException { + void handleShowConf(HttpExchange t, Map<String, String> paramMap) throws IOException { byte[] json = "Failed to get configuration".getBytes(); try { @@ -452,8 +428,7 @@ public class Httpserver implements Shutdownable { try { hs = HttpServer.create(socketAddr, 0); - hs.createContext(HttpserverUtils.HTTPSERVER_CONTEXT_PATH_LOGVIEW, - new LogHandler(conf)); + hs.createContext(HttpserverUtils.HTTPSERVER_CONTEXT_PATH_LOGVIEW, new LogHandler(conf)); hs.setExecutor(executor); hs.start(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java index dfee522..8b52607 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SandBoxMaker.java @@ -50,8 +50,7 @@ import com.alibaba.jstorm.cluster.StormConfig; * @version */ public class SandBoxMaker { - private static final Logger LOG = LoggerFactory - .getLogger(SandBoxMaker.class); + private static final Logger LOG = LoggerFactory.getLogger(SandBoxMaker.class); public static final String SANBOX_TEMPLATE_NAME = "sandbox.policy"; @@ -66,8 +65,7 @@ public class SandBoxMaker { private final boolean isEnable; - private final Map<String, String> replaceBaseMap = - new HashMap<String, String>(); + private final Map<String, String> replaceBaseMap = new HashMap<String, String>(); public SandBoxMaker(Map conf) { this.conf = conf; @@ -83,8 +81,7 @@ public class SandBoxMaker { replaceBaseMap.put(JSTORM_HOME_KEY, jstormHome); - replaceBaseMap.put(LOCAL_DIR_KEY, - (String) conf.get(Config.STORM_LOCAL_DIR)); + replaceBaseMap.put(LOCAL_DIR_KEY, (String) conf.get(Config.STORM_LOCAL_DIR)); LOG.info("JSTORM_HOME is " + jstormHome); } @@ -127,26 +124,19 @@ public class SandBoxMaker { return line; } - public String generatePolicyFile(Map<String, String> replaceMap) - throws IOException { + public String generatePolicyFile(Map<String, String> replaceMap) throws IOException { // dynamic generate policy file, no static file - String tmpPolicy = - StormConfig.supervisorTmpDir(conf) + File.separator - + UUID.randomUUID().toString(); + String tmpPolicy = StormConfig.supervisorTmpDir(conf) + File.separator + UUID.randomUUID().toString(); - InputStream inputStream = - SandBoxMaker.class.getClassLoader().getResourceAsStream( - SANBOX_TEMPLATE_NAME); + InputStream inputStream = SandBoxMaker.class.getClassLoader().getResourceAsStream(SANBOX_TEMPLATE_NAME); - PrintWriter writer = - new PrintWriter(new BufferedWriter(new FileWriter(tmpPolicy))); + PrintWriter writer = new PrintWriter(new BufferedWriter(new FileWriter(tmpPolicy))); try { InputStreamReader inputReader = new InputStreamReader(inputStream); - BufferedReader reader = - new BufferedReader(new LineNumberReader(inputReader)); + BufferedReader reader = new BufferedReader(new LineNumberReader(inputReader)); String line = null; while ((line = reader.readLine()) != null) { @@ -177,8 +167,7 @@ public class SandBoxMaker { * @return * @throws IOException */ - public String sandboxPolicy(String workerId, Map<String, String> replaceMap) - throws IOException { + public String sandboxPolicy(String workerId, Map<String, String> replaceMap) throws IOException { if (isEnable == false) { return ""; } @@ -188,9 +177,7 @@ public class SandBoxMaker { String tmpPolicy = generatePolicyFile(replaceMap); File file = new File(tmpPolicy); - String policyPath = - StormConfig.worker_root(conf, workerId) + File.separator - + SANBOX_TEMPLATE_NAME; + String policyPath = StormConfig.worker_root(conf, workerId) + File.separator + SANBOX_TEMPLATE_NAME; File dest = new File(policyPath); file.renameTo(dest); @@ -210,9 +197,7 @@ public class SandBoxMaker { SandBoxMaker maker = new SandBoxMaker(conf); try { - System.out.println("sandboxPolicy:" - + maker.sandboxPolicy("simple", - new HashMap<String, String>())); + System.out.println("sandboxPolicy:" + maker.sandboxPolicy("simple", new HashMap<String, String>())); } catch (IOException e) { e.printStackTrace(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java index 0b906e3..71859a1 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/ShutdownWork.java @@ -37,6 +37,9 @@ import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.PathUtils; import com.alibaba.jstorm.utils.TimeUtils; +/** + * @author Johnfang ([email protected]) + */ public class ShutdownWork extends RunnableCallback { private static Logger LOG = LoggerFactory.getLogger(ShutdownWork.class); @@ -54,14 +57,9 @@ public class ShutdownWork extends RunnableCallback { * * @return the topologys whose workers are shutdown successfully */ - public void shutWorker(Map conf, String supervisorId, - Map<String, String> removed, - ConcurrentHashMap<String, String> workerThreadPids, - CgroupManager cgroupManager, boolean block, - Map<String, Integer> killingWorkers, - Map<String, Integer> taskCleanupTimeoutMap) { - Map<String, List<String>> workerId2Pids = - new HashMap<String, List<String>>(); + public void shutWorker(Map conf, String supervisorId, Map<String, String> removed, ConcurrentHashMap<String, String> workerThreadPids, + CgroupManager cgroupManager, boolean block, Map<String, Integer> killingWorkers, Map<String, Integer> taskCleanupTimeoutMap) { + Map<String, List<String>> workerId2Pids = new HashMap<String, List<String>>(); boolean localMode = false; @@ -78,8 +76,7 @@ public class ShutdownWork extends RunnableCallback { try { pids = getPid(conf, workerId); } catch (IOException e1) { - LOG.error("Failed to get pid for " + workerId + " of " - + topologyId); + LOG.error("Failed to get pid for " + workerId + " of " + topologyId); } workerId2Pids.put(workerId, pids); @@ -100,15 +97,10 @@ public class ShutdownWork extends RunnableCallback { JStormUtils.process_killed(Integer.parseInt(pid)); } - if (taskCleanupTimeoutMap != null - && taskCleanupTimeoutMap.get(topologyId) != null) { - maxWaitTime = - Math.max(maxWaitTime, - taskCleanupTimeoutMap.get(topologyId)); + if (taskCleanupTimeoutMap != null && taskCleanupTimeoutMap.get(topologyId) != null) { + maxWaitTime = Math.max(maxWaitTime, taskCleanupTimeoutMap.get(topologyId)); } else { - maxWaitTime = - Math.max(maxWaitTime, ConfigExtension - .getTaskCleanupTimeoutSec(conf)); + maxWaitTime = Math.max(maxWaitTime, ConfigExtension.getTaskCleanupTimeoutSec(conf)); } } catch (Exception e) { LOG.info("Failed to shutdown ", e); @@ -126,8 +118,7 @@ public class ShutdownWork extends RunnableCallback { List<String> pids = workerId2Pids.get(workerId); int cleanupTimeout; - if (taskCleanupTimeoutMap != null - && taskCleanupTimeoutMap.get(topologyId) != null) { + if (taskCleanupTimeoutMap != null && taskCleanupTimeoutMap.get(topologyId) != null) { cleanupTimeout = taskCleanupTimeoutMap.get(topologyId); } else { cleanupTimeout = ConfigExtension.getTaskCleanupTimeoutSec(conf); @@ -137,8 +128,7 @@ public class ShutdownWork extends RunnableCallback { if (TimeUtils.current_time_secs() - initCleaupTime > cleanupTimeout) { if (localMode == false) { for (String pid : pids) { - JStormUtils - .ensure_process_killed(Integer.parseInt(pid)); + JStormUtils.ensure_process_killed(Integer.parseInt(pid)); if (cgroupManager != null) { cgroupManager.shutDownWorker(workerId, true); } @@ -169,14 +159,12 @@ public class ShutdownWork extends RunnableCallback { // delete workerid dir, LOCAL_DIR/worker/workerid PathUtils.rmr(StormConfig.worker_root(conf, workerId)); } catch (Exception e) { - LOG.warn(e + "Failed to cleanup worker " + workerId - + ". Will retry later"); + LOG.warn(e + "Failed to cleanup worker " + workerId + ". Will retry later"); } } /** - * When worker has been started by manually and supervisor, it will return - * multiple pid + * When worker has been started by manually and supervisor, it will return multiple pid * * @param conf * @param workerId http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java index c159f4b..c6bed45a 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/StateHeartbeat.java @@ -47,7 +47,6 @@ public class StateHeartbeat { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java index abc2448..c6c2877 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/Supervisor.java @@ -24,6 +24,7 @@ import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import com.alibaba.jstorm.daemon.worker.WorkerReportError; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,33 +52,28 @@ import com.alibaba.jstorm.utils.JStormUtils; * * Supevisor workflow 1. write SupervisorInfo to ZK * - * 2. Every 10 seconds run SynchronizeSupervisor 2.1 download new topology 2.2 - * release useless worker 2.3 assgin new task to - * /local-dir/supervisor/localstate 2.4 add one syncProcesses event + * 2. Every 10 seconds run SynchronizeSupervisor 2.1 download new topology 2.2 release useless worker 2.3 assgin new task to /local-dir/supervisor/localstate + * 2.4 add one syncProcesses event * - * 3. Every supervisor.monitor.frequency.secs run SyncProcesses 3.1 kill useless - * worker 3.2 start new worker + * 3. Every supervisor.monitor.frequency.secs run SyncProcesses 3.1 kill useless worker 3.2 start new worker * - * 4. create heartbeat thread every supervisor.heartbeat.frequency.secs, write - * SupervisorInfo to ZK + * 4. create heartbeat thread every supervisor.heartbeat.frequency.secs, write SupervisorInfo to ZK + * @author Johnfang ([email protected]) */ public class Supervisor { private static Logger LOG = LoggerFactory.getLogger(Supervisor.class); - /** * create and start one supervisor * * @param conf : configurationdefault.yaml storm.yaml * @param sharedContext : null (right now) - * @return SupervisorManger: which is used to shutdown all workers and - * supervisor + * @return SupervisorManger: which is used to shutdown all workers and supervisor */ @SuppressWarnings("rawtypes") - public SupervisorManger mkSupervisor(Map conf, IContext sharedContext) - throws Exception { + public SupervisorManger mkSupervisor(Map conf, IContext sharedContext) throws Exception { LOG.info("Starting Supervisor with conf " + conf); @@ -91,13 +87,15 @@ public class Supervisor { * Step 2: create ZK operation instance StromClusterState */ - StormClusterState stormClusterState = - Cluster.mk_storm_cluster_state(conf); + StormClusterState stormClusterState = Cluster.mk_storm_cluster_state(conf); + + String hostName = JStormServerUtils.getHostName(conf); + WorkerReportError workerReportError = + new WorkerReportError(stormClusterState, hostName); + /* - * Step 3, create LocalStat LocalStat is one KV database 4.1 create - * LocalState instance; 4.2 get supervisorId, if no supervisorId, create - * one + * Step 3, create LocalStat LocalStat is one KV database 4.1 create LocalState instance; 4.2 get supervisorId, if no supervisorId, create one */ LocalState localState = StormConfig.supervisorState(conf); @@ -115,13 +113,11 @@ public class Supervisor { // sync hearbeat to nimbus Heartbeat hb = new Heartbeat(conf, stormClusterState, supervisorId); hb.update(); - AsyncLoopThread heartbeat = - new AsyncLoopThread(hb, false, null, Thread.MIN_PRIORITY, true); + AsyncLoopThread heartbeat = new AsyncLoopThread(hb, false, null, Thread.MIN_PRIORITY, true); threads.add(heartbeat); // Sync heartbeat to Apsara Container - AsyncLoopThread syncContainerHbThread = - SyncContainerHb.mkSupervisorInstance(conf); + AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkSupervisorInstance(conf); if (syncContainerHbThread != null) { threads.add(syncContainerHbThread); } @@ -129,34 +125,22 @@ public class Supervisor { // Step 6 create and start sync Supervisor thread // every supervisor.monitor.frequency.secs second run SyncSupervisor EventManagerImp processEventManager = new EventManagerImp(); - AsyncLoopThread processEventThread = - new AsyncLoopThread(processEventManager); + AsyncLoopThread processEventThread = new AsyncLoopThread(processEventManager); threads.add(processEventThread); - ConcurrentHashMap<String, String> workerThreadPids = - new ConcurrentHashMap<String, String>(); - SyncProcessEvent syncProcessEvent = - new SyncProcessEvent(supervisorId, conf, localState, - workerThreadPids, sharedContext); + ConcurrentHashMap<String, String> workerThreadPids = new ConcurrentHashMap<String, String>(); + SyncProcessEvent syncProcessEvent = new SyncProcessEvent(supervisorId, conf, localState, workerThreadPids, sharedContext, workerReportError); EventManagerImp syncSupEventManager = new EventManagerImp(); - AsyncLoopThread syncSupEventThread = - new AsyncLoopThread(syncSupEventManager); + AsyncLoopThread syncSupEventThread = new AsyncLoopThread(syncSupEventManager); threads.add(syncSupEventThread); SyncSupervisorEvent syncSupervisorEvent = - new SyncSupervisorEvent(supervisorId, conf, - processEventManager, syncSupEventManager, - stormClusterState, localState, syncProcessEvent, hb); - - int syncFrequence = - JStormUtils.parseInt(conf - .get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)); - EventManagerPusher syncSupervisorPusher = - new EventManagerPusher(syncSupEventManager, - syncSupervisorEvent, syncFrequence); - AsyncLoopThread syncSupervisorThread = - new AsyncLoopThread(syncSupervisorPusher); + new SyncSupervisorEvent(supervisorId, conf, processEventManager, syncSupEventManager, stormClusterState, localState, syncProcessEvent, hb); + + int syncFrequence = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS)); + EventManagerPusher syncSupervisorPusher = new EventManagerPusher(syncSupEventManager, syncSupervisorEvent, syncFrequence); + AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher); threads.add(syncSupervisorThread); Httpserver httpserver = null; @@ -168,9 +152,7 @@ public class Supervisor { } // SupervisorManger which can shutdown all supervisor and workers - return new SupervisorManger(conf, supervisorId, threads, - syncSupEventManager, processEventManager, httpserver, - stormClusterState, workerThreadPids); + return new SupervisorManger(conf, supervisorId, threads, syncSupEventManager, processEventManager, httpserver, stormClusterState, workerThreadPids); } /** @@ -210,7 +192,7 @@ public class Supervisor { JStormUtils.redirectOutput("/dev/null"); initShutdownHook(supervisorManager); - + while (supervisorManager.isFinishShutdown() == false) { try { Thread.sleep(1000); @@ -222,11 +204,10 @@ public class Supervisor { } catch (Exception e) { LOG.error("Failed to start supervisor\n", e); System.exit(1); - }finally { - LOG.info("Shutdown supervisor!!!"); + } finally { + LOG.info("Shutdown supervisor!!!"); } - } /** http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java index f53ef72..ae89607 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorInfo.java @@ -29,7 +29,7 @@ import org.apache.commons.lang.builder.ToStringStyle; /** * Object stored in ZK /ZK-DIR/supervisors - * + * * @author Xin.Zhou/Longda */ public class SupervisorInfo implements Serializable { @@ -46,8 +46,7 @@ public class SupervisorInfo implements Serializable { private transient Set<Integer> availableWorkerPorts; - public SupervisorInfo(String hostName, String supervisorId, - Set<Integer> workerPorts) { + public SupervisorInfo(String hostName, String supervisorId, Set<Integer> workerPorts) { this.hostName = hostName; this.supervisorId = supervisorId; this.workerPorts = workerPorts; @@ -80,16 +79,19 @@ public class SupervisorInfo implements Serializable { public Set<Integer> getWorkerPorts() { return workerPorts; } - public void setAvailableWorkerPorts(Set<Integer> workerPorts){ + + public void setAvailableWorkerPorts(Set<Integer> workerPorts) { if (availableWorkerPorts == null) availableWorkerPorts = new HashSet<Integer>(); availableWorkerPorts.addAll(workerPorts); } + public Set<Integer> getAvailableWorkerPorts() { if (availableWorkerPorts == null) availableWorkerPorts = new HashSet<Integer>(); return availableWorkerPorts; } + public void setWorkerPorts(Set<Integer> workerPorts) { this.workerPorts = workerPorts; } @@ -98,20 +100,11 @@ public class SupervisorInfo implements Serializable { public int hashCode() { final int prime = 31; int result = 1; - result = - prime * result + ((hostName == null) ? 0 : hostName.hashCode()); - result = - prime - * result - + ((supervisorId == null) ? 0 : supervisorId.hashCode()); - result = - prime * result + ((timeSecs == null) ? 0 : timeSecs.hashCode()); - result = - prime * result - + ((uptimeSecs == null) ? 0 : uptimeSecs.hashCode()); - result = - prime * result - + ((workerPorts == null) ? 0 : workerPorts.hashCode()); + result = prime * result + ((hostName == null) ? 0 : hostName.hashCode()); + result = prime * result + ((supervisorId == null) ? 0 : supervisorId.hashCode()); + result = prime * result + ((timeSecs == null) ? 0 : timeSecs.hashCode()); + result = prime * result + ((uptimeSecs == null) ? 0 : uptimeSecs.hashCode()); + result = prime * result + ((workerPorts == null) ? 0 : workerPorts.hashCode()); return result; } @@ -154,19 +147,17 @@ public class SupervisorInfo implements Serializable { @Override public String toString() { - return ToStringBuilder.reflectionToString(this, - ToStringStyle.SHORT_PREFIX_STYLE); + return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } /** * get Map<supervisorId, hostname> - * + * * @param stormClusterState * @param callback * @return */ - public static Map<String, String> getNodeHost( - Map<String, SupervisorInfo> supInfos) { + public static Map<String, String> getNodeHost(Map<String, SupervisorInfo> supInfos) { Map<String, String> rtn = new HashMap<String, String>(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java index a2806de..99c2c76 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SupervisorManger.java @@ -39,9 +39,9 @@ import com.alibaba.jstorm.utils.PathUtils; /** * supervisor shutdown manager which can shutdown supervisor + * @author Johnfang ([email protected]) */ -public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, - DaemonCommon, Runnable { +public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, DaemonCommon, Runnable { private static Logger LOG = LoggerFactory.getLogger(SupervisorManger.class); @@ -67,11 +67,8 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, private volatile boolean isFinishShutdown = false; - public SupervisorManger(Map conf, String supervisorId, - Vector<AsyncLoopThread> threads, - EventManager processesEventManager, EventManager eventManager, - Httpserver httpserver, StormClusterState stormClusterState, - ConcurrentHashMap<String, String> workerThreadPidsAtom) { + public SupervisorManger(Map conf, String supervisorId, Vector<AsyncLoopThread> threads, EventManager processesEventManager, EventManager eventManager, + Httpserver httpserver, StormClusterState stormClusterState, ConcurrentHashMap<String, String> workerThreadPidsAtom) { this.conf = conf; this.supervisorId = supervisorId; this.shutdown = new AtomicBoolean(false); @@ -104,8 +101,7 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, // } catch (InterruptedException e) { // LOG.error(e.getMessage(), e); // } - LOG.info("Successfully shutdown thread:" - + thread.getThread().getName()); + LOG.info("Successfully shutdown thread:" + thread.getThread().getName()); } eventManager.shutdown(); processesEventManager.shutdown(); @@ -144,15 +140,13 @@ public class SupervisorManger extends ShutdownWork implements SupervisorDaemon, return; } List<String> myWorkerIds = PathUtils.read_dir_contents(path); - HashMap<String, String> workerId2topologyIds = - new HashMap<String, String>(); + HashMap<String, String> workerId2topologyIds = new HashMap<String, String>(); for (String workerId : myWorkerIds) { workerId2topologyIds.put(workerId, null); } - shutWorker(conf, supervisorId, workerId2topologyIds, - workerThreadPidsAtom, null, true, null, null); + shutWorker(conf, supervisorId, workerId2topologyIds, workerThreadPidsAtom, null, true, null, null); } @Override http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java index d90eb29..01f2a3a 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/daemon/supervisor/SyncProcessEvent.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; +import com.alibaba.jstorm.daemon.worker.*; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,12 +46,6 @@ import backtype.storm.utils.LocalState; import com.alibaba.jstorm.client.ConfigExtension; import com.alibaba.jstorm.cluster.Common; import com.alibaba.jstorm.cluster.StormConfig; -import com.alibaba.jstorm.daemon.worker.LocalAssignment; -import com.alibaba.jstorm.daemon.worker.ProcessSimulator; -import com.alibaba.jstorm.daemon.worker.State; -import com.alibaba.jstorm.daemon.worker.Worker; -import com.alibaba.jstorm.daemon.worker.WorkerHeartbeat; -import com.alibaba.jstorm.daemon.worker.WorkerShutdown; import com.alibaba.jstorm.utils.JStormUtils; import com.alibaba.jstorm.utils.Pair; import com.alibaba.jstorm.utils.PathUtils; @@ -59,6 +54,7 @@ import com.alibaba.jstorm.utils.TimeUtils; /** * SyncProcesses (1) kill bad worker (2) start new worker + * @author Johnfang ([email protected]) */ class SyncProcessEvent extends ShutdownWork { private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); @@ -78,8 +74,7 @@ class SyncProcessEvent extends ShutdownWork { private SandBoxMaker sandBoxMaker; /** - * Due to the worker startTime is put in Supervisor memory, When supervisor - * restart, the starting worker is likely to be killed + * Due to the worker startTime is put in Supervisor memory, When supervisor restart, the starting worker is likely to be killed */ private Map<String, Pair<Integer, Integer>> workerIdToStartTimeAndPort; /** @@ -95,6 +90,8 @@ class SyncProcessEvent extends ShutdownWork { // private Supervisor supervisor; private int lastTime; + private WorkerReportError workerReportError; + /** * @param conf * @param localState @@ -104,10 +101,8 @@ class SyncProcessEvent extends ShutdownWork { * @param workerThreadPidsReadLock * @param workerThreadPidsWriteLock */ - public SyncProcessEvent(String supervisorId, Map conf, - LocalState localState, - ConcurrentHashMap<String, String> workerThreadPids, - IContext sharedContext) { + public SyncProcessEvent(String supervisorId, Map conf, LocalState localState, ConcurrentHashMap<String, String> workerThreadPids, + IContext sharedContext, WorkerReportError workerReportError) { this.supervisorId = supervisorId; @@ -122,8 +117,7 @@ class SyncProcessEvent extends ShutdownWork { this.sandBoxMaker = new SandBoxMaker(conf); - this.workerIdToStartTimeAndPort = - new HashMap<String, Pair<Integer, Integer>>(); + this.workerIdToStartTimeAndPort = new HashMap<String, Pair<Integer, Integer>>(); this.needDownloadTopologys = new AtomicReference<Set>(); @@ -132,30 +126,27 @@ class SyncProcessEvent extends ShutdownWork { } killingWorkers = new HashMap<String, Integer>(); + this.workerReportError = workerReportError; } /** - * @@@ Change the old logic In the old logic, it will store - * LS_LOCAL_ASSIGNMENTS Map<String, Integer> into LocalState - * - * But I don't think LS_LOCAL_ASSIGNMENTS is useful, so remove this - * logic + * @@@ Change the old logic In the old logic, it will store LS_LOCAL_ASSIGNMENTS Map<String, Integer> into LocalState + * + * But I don't think LS_LOCAL_ASSIGNMENTS is useful, so remove this logic */ @SuppressWarnings("unchecked") @Override public void run() { - + } - public void run(Map<Integer, LocalAssignment> localAssignments) { - LOG.debug("Syncing processes, interval seconds:" - + TimeUtils.time_delta(lastTime)); + public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds ) { + LOG.debug("Syncing processes, interval seconds:" + TimeUtils.time_delta(lastTime)); lastTime = TimeUtils.current_time_secs(); try { /** - * Step 1: get assigned tasks from localstat Map<port(type Integer), - * LocalAssignment> + * Step 1: get assigned tasks from localstat Map<port(type Integer), LocalAssignment> */ if (localAssignments == null) { localAssignments = new HashMap<Integer, LocalAssignment>(); @@ -163,13 +154,11 @@ class SyncProcessEvent extends ShutdownWork { LOG.debug("Assigned tasks: " + localAssignments); /** - * Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat - * Map<workerid [WorkerHeartbeat, state]> + * Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat Map<workerid [WorkerHeartbeat, state]> */ Map<String, StateHeartbeat> localWorkerStats = null; try { - localWorkerStats = - getLocalWorkerStats(conf, localState, localAssignments); + localWorkerStats = getLocalWorkerStats(conf, localState, localAssignments); } catch (Exception e) { LOG.error("Failed to get Local worker stats"); throw e; @@ -177,20 +166,14 @@ class SyncProcessEvent extends ShutdownWork { LOG.debug("Allocated: " + localWorkerStats); /** - * Step 3: kill Invalid Workers and remove killed worker from - * localWorkerStats + * Step 3: kill Invalid Workers and remove killed worker from localWorkerStats */ Map<String, Integer> taskCleaupTimeoutMap = null; Set<Integer> keepPorts = null; try { - taskCleaupTimeoutMap = - (Map<String, Integer>) localState - .get(Common.LS_TASK_CLEANUP_TIMEOUT); - keepPorts = - killUselessWorkers(localWorkerStats, localAssignments, - taskCleaupTimeoutMap); - localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, - taskCleaupTimeoutMap); + taskCleaupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT); + keepPorts = killUselessWorkers(localWorkerStats, localAssignments, taskCleaupTimeoutMap); + localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleaupTimeoutMap); } catch (IOException e) { LOG.error("Failed to kill workers", e); } @@ -202,7 +185,7 @@ class SyncProcessEvent extends ShutdownWork { checkNeedUpdateTopologys(localWorkerStats, localAssignments); // start new workers - startNewWorkers(keepPorts, localAssignments); + startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds); } catch (Exception e) { LOG.error("Failed Sync Process", e); @@ -215,14 +198,13 @@ class SyncProcessEvent extends ShutdownWork { * check all workers is failed or not */ @SuppressWarnings("unchecked") - public void checkNeedUpdateTopologys( - Map<String, StateHeartbeat> localWorkerStats, - Map<Integer, LocalAssignment> localAssignments) throws Exception { + public void checkNeedUpdateTopologys(Map<String, StateHeartbeat> localWorkerStats, Map<Integer, LocalAssignment> localAssignments) throws Exception { Set<String> topologys = new HashSet<String>(); + Map<String, Long> topologyAssignTimeStamps = new HashMap<String, Long>(); - for (Map.Entry<Integer, LocalAssignment> entry : localAssignments - .entrySet()) { + for (Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) { topologys.add(entry.getValue().getTopologyId()); + topologyAssignTimeStamps.put(entry.getValue().getTopologyId(), entry.getValue().getTimeStamp()); } for (StateHeartbeat stateHb : localWorkerStats.values()) { @@ -236,32 +218,27 @@ class SyncProcessEvent extends ShutdownWork { Set<String> needRemoveTopologys = new HashSet<String>(); for (String topologyId : topologys) { try { - long lastModifytime = - StormConfig.get_supervisor_topology_Bianrymodify_time( - conf, topologyId); - if ((currTime - lastModifytime) / 1000 < (JStormUtils.MIN_1 * 2)) { + long newAssignTime = topologyAssignTimeStamps.get(topologyId); + if ((currTime - newAssignTime) / 1000 < (JStormUtils.MIN_1 * 2)) { LOG.debug("less 2 minite ,so removed " + topologyId); needRemoveTopologys.add(topologyId); } } catch (Exception e) { - LOG.error( - "Failed to get the time of file last modification for topology" - + topologyId, e); + LOG.error("Failed to get the time of file last modification for topology" + topologyId, e); needRemoveTopologys.add(topologyId); } } topologys.removeAll(needRemoveTopologys); if (topologys.size() > 0) { - LOG.debug("Following topologys is going to re-download the jars, " - + topologys); + LOG.debug("Following topologys is going to re-download the jars, " + topologys); } needDownloadTopologys.set(topologys); } /** * mark all new Workers - * + * * @param workerIds * @pdOid 52b11418-7474-446d-bff5-0ecd68f4954f */ @@ -271,40 +248,32 @@ class SyncProcessEvent extends ShutdownWork { for (Entry<Integer, String> entry : workerIds.entrySet()) { String oldWorkerIds = portToWorkerId.get(entry.getKey()); - if(oldWorkerIds != null){ + if (oldWorkerIds != null) { workerIdToStartTimeAndPort.remove(oldWorkerIds); // update portToWorkerId - LOG.info("exit port is still occupied by old wokerId, so remove unuseful " + - oldWorkerIds+ " form workerIdToStartTimeAndPort"); + LOG.info("exit port is still occupied by old wokerId, so remove unuseful " + oldWorkerIds + " form workerIdToStartTimeAndPort"); } portToWorkerId.put(entry.getKey(), entry.getValue()); - workerIdToStartTimeAndPort.put(entry.getValue(), - new Pair<Integer, Integer>(startTime, entry.getKey())); + workerIdToStartTimeAndPort.put(entry.getValue(), new Pair<Integer, Integer>(startTime, entry.getKey())); } } /** - * check new workers if the time is not > * - * SUPERVISOR_WORKER_START_TIMEOUT_SECS, otherwise info failed - * + * check new workers if the time is not > * SUPERVISOR_WORKER_START_TIMEOUT_SECS, otherwise info failed + * * @param conf * @pdOid f0a6ab43-8cd3-44e1-8fd3-015a2ec51c6a */ - public void checkNewWorkers(Map conf) throws IOException, - InterruptedException { + public void checkNewWorkers(Map conf) throws IOException, InterruptedException { Set<String> workers = new HashSet<String>(); - for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort - .entrySet()) { + for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort.entrySet()) { String workerId = entry.getKey(); int startTime = entry.getValue().getFirst(); LocalState ls = StormConfig.worker_state(conf, workerId); - WorkerHeartbeat whb = - (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT); + WorkerHeartbeat whb = (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT); if (whb == null) { - if ((TimeUtils.current_time_secs() - startTime) < JStormUtils - .parseInt(conf - .get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS))) { + if ((TimeUtils.current_time_secs() - startTime) < JStormUtils.parseInt(conf.get(Config.SUPERVISOR_WORKER_START_TIMEOUT_SECS))) { LOG.info(workerId + " still hasn't started"); } else { LOG.error("Failed to start Worker " + workerId); @@ -321,15 +290,15 @@ class SyncProcessEvent extends ShutdownWork { this.portToWorkerId.remove(port); } } - public Map<Integer, String> getPortToWorkerId(){ + + public Map<Integer, String> getPortToWorkerId() { return portToWorkerId; } /** * get localstat approved workerId's map - * - * @return Map<workerid [workerheart, state]> [workerheart, state] is also a - * map, key is "workheartbeat" and "state" + * + * @return Map<workerid [workerheart, state]> [workerheart, state] is also a map, key is "workheartbeat" and "state" * @param conf * @param localState * @param assignedTasks @@ -337,22 +306,17 @@ class SyncProcessEvent extends ShutdownWork { * @pdOid 11c9bebb-d082-4c51-b323-dd3d5522a649 */ @SuppressWarnings("unchecked") - public Map<String, StateHeartbeat> getLocalWorkerStats(Map conf, - LocalState localState, Map<Integer, LocalAssignment> assignedTasks) - throws Exception { + public Map<String, StateHeartbeat> getLocalWorkerStats(Map conf, LocalState localState, Map<Integer, LocalAssignment> assignedTasks) throws Exception { - Map<String, StateHeartbeat> workeridHbstate = - new HashMap<String, StateHeartbeat>(); + Map<String, StateHeartbeat> workeridHbstate = new HashMap<String, StateHeartbeat>(); int now = TimeUtils.current_time_secs(); /** - * Get Map<workerId, WorkerHeartbeat> from - * local_dir/worker/ids/heartbeat + * Get Map<workerId, WorkerHeartbeat> from local_dir/worker/ids/heartbeat */ Map<String, WorkerHeartbeat> idToHeartbeat = readWorkerHeartbeats(conf); - for (Map.Entry<String, WorkerHeartbeat> entry : idToHeartbeat - .entrySet()) { + for (Entry<String, WorkerHeartbeat> entry : idToHeartbeat.entrySet()) { String workerid = entry.getKey().toString(); @@ -366,10 +330,9 @@ class SyncProcessEvent extends ShutdownWork { if (timeToPort != null) { LocalAssignment localAssignment = assignedTasks.get(timeToPort.getSecond()); if (localAssignment == null) { - LOG.info("Following worker don't exit assignment, so remove this port=" - + timeToPort.getSecond()); + LOG.info("Following worker don't exit assignment, so remove this port=" + timeToPort.getSecond()); state = State.disallowed; - //workerId is disallowed ,so remove it from workerIdToStartTimeAndPort + // workerId is disallowed ,so remove it from workerIdToStartTimeAndPort Integer port = this.workerIdToStartTimeAndPort.get(workerid).getSecond(); this.workerIdToStartTimeAndPort.remove(workerid); this.portToWorkerId.remove(port); @@ -381,12 +344,21 @@ class SyncProcessEvent extends ShutdownWork { // isn't assigned task state = State.disallowed; - } else if ((now - whb.getTimeSecs()) > JStormUtils.parseInt(conf - .get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) {// + } else if ((now - whb.getTimeSecs()) > JStormUtils.parseInt(conf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS))) { + if (killingWorkers.containsKey(workerid) == false) { + String outTimeInfo = " it is likely to be out of memory, the worker is time out "; + workerReportError.report(whb.getTopologyId(), whb.getPort(), + whb.getTaskIds(), outTimeInfo); + } state = State.timedOut; } else { if (isWorkerDead(workerid)) { + if (killingWorkers.containsKey(workerid) == false){ + String workeDeadInfo = "Worker is dead "; + workerReportError.report(whb.getTopologyId(), whb.getPort(), + whb.getTaskIds(), workeDeadInfo); + } state = State.timedOut; } else { state = State.valid; @@ -395,13 +367,10 @@ class SyncProcessEvent extends ShutdownWork { if (state != State.valid) { if (killingWorkers.containsKey(workerid) == false) - LOG.info("Worker:" + workerid + " state:" + state - + " WorkerHeartbeat:" + whb + " assignedTasks:" - + assignedTasks + " at supervisor time-secs " + now); + LOG.info("Worker:" + workerid + " state:" + state + " WorkerHeartbeat:" + whb + " assignedTasks:" + assignedTasks + + " at supervisor time-secs " + now); } else { - LOG.debug("Worker:" + workerid + " state:" + state - + " WorkerHeartbeat: " + whb - + " at supervisor time-secs " + now); + LOG.debug("Worker:" + workerid + " state:" + state + " WorkerHeartbeat: " + whb + " at supervisor time-secs " + now); } workeridHbstate.put(workerid, new StateHeartbeat(state, whb)); @@ -412,32 +381,26 @@ class SyncProcessEvent extends ShutdownWork { /** * check whether the workerheartbeat is allowed in the assignedTasks - * + * * @param whb : WorkerHeartbeat * @param assignedTasks - * @return boolean if true, the assignments(LS-LOCAL-ASSIGNMENTS) is match - * with workerheart if fasle, is not matched + * @return boolean if true, the assignments(LS-LOCAL-ASSIGNMENTS) is match with workerheart if fasle, is not matched */ - public boolean matchesAssignment(WorkerHeartbeat whb, - Map<Integer, LocalAssignment> assignedTasks) { + public boolean matchesAssignment(WorkerHeartbeat whb, Map<Integer, LocalAssignment> assignedTasks) { boolean isMatch = true; LocalAssignment localAssignment = assignedTasks.get(whb.getPort()); if (localAssignment == null) { - LOG.debug("Following worker has been removed, port=" - + whb.getPort() + ", assignedTasks=" + assignedTasks); + LOG.debug("Following worker has been removed, port=" + whb.getPort() + ", assignedTasks=" + assignedTasks); isMatch = false; } else if (!whb.getTopologyId().equals(localAssignment.getTopologyId())) { // topology id not equal - LOG.info("topology id not equal whb=" + whb.getTopologyId() - + ",localAssignment=" + localAssignment.getTopologyId()); + LOG.info("topology id not equal whb=" + whb.getTopologyId() + ",localAssignment=" + localAssignment.getTopologyId()); isMatch = false; }/* - * else if (!(whb.getTaskIds().equals(localAssignment.getTaskIds()))) { - * // task-id isn't equal LOG.info("task-id isn't equal whb=" + - * whb.getTaskIds() + ",localAssignment=" + - * localAssignment.getTaskIds()); isMatch = false; } + * else if (!(whb.getTaskIds().equals(localAssignment.getTaskIds()))) { // task-id isn't equal LOG.info("task-id isn't equal whb=" + whb.getTaskIds() + + * ",localAssignment=" + localAssignment.getTaskIds()); isMatch = false; } */ return isMatch; @@ -445,17 +408,15 @@ class SyncProcessEvent extends ShutdownWork { /** * get all workers heartbeats of the supervisor - * + * * @param conf * @return Map<workerId, WorkerHeartbeat> * @throws IOException * @throws IOException */ - public Map<String, WorkerHeartbeat> readWorkerHeartbeats(Map conf) - throws Exception { + public Map<String, WorkerHeartbeat> readWorkerHeartbeats(Map conf) throws Exception { - Map<String, WorkerHeartbeat> workerHeartbeats = - new HashMap<String, WorkerHeartbeat>(); + Map<String, WorkerHeartbeat> workerHeartbeats = new HashMap<String, WorkerHeartbeat>(); // get the path: STORM-LOCAL-DIR/workers String path = StormConfig.worker_root(conf); @@ -480,20 +441,19 @@ class SyncProcessEvent extends ShutdownWork { /** * get worker heartbeat by workerid - * + * * @param conf * @param workerId * @returns WorkerHeartbeat * @throws IOException */ - public WorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) - throws Exception { + public WorkerHeartbeat readWorkerHeartbeat(Map conf, String workerId) throws Exception { try { LocalState ls = StormConfig.worker_state(conf, workerId); return (WorkerHeartbeat) ls.get(Common.LS_WORKER_HEARTBEAT); - } catch (IOException e) { + } catch (Exception e) { LOG.error("Failed to get worker Heartbeat", e); return null; } @@ -502,7 +462,7 @@ class SyncProcessEvent extends ShutdownWork { /** * launch a worker in local mode - * + * * @param conf * @param sharedcontext * @param topologyId @@ -512,17 +472,12 @@ class SyncProcessEvent extends ShutdownWork { * @param workerThreadPidsAtom * @throws Exception */ - public void launchWorker(Map conf, IContext sharedcontext, - String topologyId, String supervisorId, Integer port, - String workerId, - ConcurrentHashMap<String, String> workerThreadPidsAtom) - throws Exception { + public void launchWorker(Map conf, IContext sharedcontext, String topologyId, String supervisorId, Integer port, String workerId, + ConcurrentHashMap<String, String> workerThreadPidsAtom) throws Exception { String pid = UUID.randomUUID().toString(); - WorkerShutdown worker = - Worker.mk_worker(conf, sharedcontext, topologyId, supervisorId, - port, workerId, null); + WorkerShutdown worker = Worker.mk_worker(conf, sharedcontext, topologyId, supervisorId, port, workerId, null); ProcessSimulator.registerProcess(pid, worker); @@ -534,13 +489,11 @@ class SyncProcessEvent extends ShutdownWork { private Set<String> setFilterJars(Map totalConf) { Set<String> filterJars = new HashSet<String>(); - boolean enableClassloader = - ConfigExtension.isEnableTopologyClassLoader(totalConf); + boolean enableClassloader = ConfigExtension.isEnableTopologyClassLoader(totalConf); if (enableClassloader == false) { // avoid logback vs log4j conflict boolean enableLog4j = false; - String userDefLog4jConf = - ConfigExtension.getUserDefinedLog4jConf(totalConf); + String userDefLog4jConf = ConfigExtension.getUserDefinedLog4jConf(totalConf); if (StringUtils.isBlank(userDefLog4jConf) == false) { enableLog4j = true; } @@ -601,8 +554,7 @@ class SyncProcessEvent extends ShutdownWork { } if (stormHome != null) { - List<String> stormHomeFiles = - PathUtils.read_dir_contents(stormHome); + List<String> stormHomeFiles = PathUtils.read_dir_contents(stormHome); for (String file : stormHomeFiles) { if (file.endsWith(".jar")) { @@ -610,13 +562,10 @@ class SyncProcessEvent extends ShutdownWork { } } - List<String> stormLibFiles = - PathUtils.read_dir_contents(stormHome + File.separator - + "lib"); + List<String> stormLibFiles = PathUtils.read_dir_contents(stormHome + File.separator + "lib"); for (String file : stormLibFiles) { if (file.endsWith(".jar")) { - classSet.add(stormHome + File.separator + "lib" - + File.separator + file); + classSet.add(stormHome + File.separator + "lib" + File.separator + file); } } @@ -646,8 +595,7 @@ class SyncProcessEvent extends ShutdownWork { String childopts = " "; if (stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS) != null) { - childopts += - (String) stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS); + childopts += (String) stormConf.get(Config.TOPOLOGY_WORKER_CHILDOPTS); } else if (ConfigExtension.getWorkerGc(stormConf) != null) { childopts += ConfigExtension.getWorkerGc(stormConf); } @@ -655,8 +603,7 @@ class SyncProcessEvent extends ShutdownWork { return childopts; } - public String getLogParameter(Map conf, String stormHome, - String topologyName, int port) { + public String getLogParameter(Map conf, String stormHome, String topologyName, int port) { final String LOGBACK_CONF_TAG = "logback.configurationFile"; final String LOGBACK_CONF_TAG_CMD = " -D" + LOGBACK_CONF_TAG + "="; final String DEFAULT_LOG_CONF = "jstorm.logback.xml"; @@ -664,13 +611,15 @@ class SyncProcessEvent extends ShutdownWork { String logFileName = JStormUtils.genLogName(topologyName, port); // String logFileName = topologyId + "-worker-" + port + ".log"; + StringBuilder commandSB = new StringBuilder(); commandSB.append(" -Dlogfile.name="); commandSB.append(logFileName); + commandSB.append(" -Dtopology.name=").append(topologyName); + // commandSB.append(" -Dlog4j.ignoreTCL=true"); - String userDefLogbackConf = - ConfigExtension.getUserDefinedLogbackConf(conf); + String userDefLogbackConf = ConfigExtension.getUserDefinedLogbackConf(conf); String logConf = System.getProperty(LOGBACK_CONF_TAG); if (StringUtils.isBlank(userDefLogbackConf) == false) { @@ -679,9 +628,7 @@ class SyncProcessEvent extends ShutdownWork { } else if (StringUtils.isBlank(logConf) == false) { commandSB.append(LOGBACK_CONF_TAG_CMD).append(logConf); } else if (StringUtils.isBlank(stormHome) == false) { - commandSB.append(LOGBACK_CONF_TAG_CMD).append(stormHome) - .append(File.separator).append("conf") - .append(File.separator).append(DEFAULT_LOG_CONF); + commandSB.append(LOGBACK_CONF_TAG_CMD).append(stormHome).append(File.separator).append("conf").append(File.separator).append(DEFAULT_LOG_CONF); } else { commandSB.append(LOGBACK_CONF_TAG_CMD + DEFAULT_LOG_CONF); } @@ -690,38 +637,35 @@ class SyncProcessEvent extends ShutdownWork { String userDefLog4jConf = ConfigExtension.getUserDefinedLog4jConf(conf); if (StringUtils.isBlank(userDefLog4jConf) == false) { LOG.info("Use user fined log4j conf " + userDefLog4jConf); - commandSB.append(" -D" + LOG4J_CONF_TAG + "=").append( - userDefLog4jConf); + commandSB.append(" -D" + LOG4J_CONF_TAG + "=").append(userDefLog4jConf); } return commandSB.toString(); } - private String getGcDumpParam(Map totalConf) { + private String getGcDumpParam(String topologyName, Map totalConf) { // String gcPath = ConfigExtension.getWorkerGcPath(totalConf); String gcPath = JStormUtils.getLogDir(); Date now = new Date(); String nowStr = TimeFormat.getSecond(now); - StringBuilder gc = new StringBuilder(); - + StringBuilder gc = new StringBuilder(256); gc.append(" -Xloggc:"); - gc.append(gcPath); - gc.append(File.separator); - gc.append("%TOPOLOGYID%-worker-%ID%-"); - gc.append(nowStr); + gc.append(gcPath).append(File.separator); + gc.append(topologyName).append(File.separator); + gc.append("%TOPOLOGYID%-worker-%ID%"); gc.append("-gc.log -verbose:gc -XX:HeapDumpPath="); - gc.append(gcPath).append(File.separator).append("java-%TOPOLOGYID%-") - .append(nowStr).append(".hprof"); + gc.append(gcPath).append(File.separator).append(topologyName).append(File.separator).append("java-%TOPOLOGYID%-").append(nowStr).append(".hprof"); gc.append(" "); + return gc.toString(); } /** * launch a worker in distributed mode - * + * * @param conf * @param sharedcontext * @param topologyId @@ -731,20 +675,17 @@ class SyncProcessEvent extends ShutdownWork { * @throws IOException * @pdOid 6ea369dd-5ce2-4212-864b-1f8b2ed94abb */ - public void launchWorker(Map conf, IContext sharedcontext, - String topologyId, String supervisorId, Integer port, - String workerId, LocalAssignment assignment) throws IOException { + public void launchWorker(Map conf, IContext sharedcontext, String topologyId, String supervisorId, Integer port, String workerId, LocalAssignment assignment) + throws IOException { // STORM-LOCAL-DIR/supervisor/stormdist/topologyId - String stormroot = - StormConfig.supervisor_stormdist_root(conf, topologyId); + String stormroot = StormConfig.supervisor_stormdist_root(conf, topologyId); // STORM-LOCAL-DIR/supervisor/stormdist/topologyId/stormjar.jar String stormjar = StormConfig.stormjar_path(stormroot); // get supervisor conf - Map stormConf = - StormConfig.read_supervisor_topology_conf(conf, topologyId); + Map stormConf = StormConfig.read_supervisor_topology_conf(conf, topologyId); Map totalConf = new HashMap(); totalConf.putAll(conf); @@ -761,12 +702,13 @@ class SyncProcessEvent extends ShutdownWork { String stormhome = System.getProperty("jstorm.home"); long memSize = assignment.getMem(); + long memMinSize = ConfigExtension.getMemMinSizePerWorker(totalConf); int cpuNum = assignment.getCpu(); long memGsize = memSize / JStormUtils.SIZE_1_G; int gcThreadsNum = memGsize > 4 ? (int) (memGsize * 1.5) : 4; String childopts = getChildOpts(totalConf); - childopts += getGcDumpParam(totalConf); + childopts += getGcDumpParam(Common.getTopologyNameById(topologyId), totalConf); Map<String, String> environment = new HashMap<String, String>(); @@ -776,15 +718,13 @@ class SyncProcessEvent extends ShutdownWork { environment.put("REDIRECT", "false"); } - environment.put("LD_LIBRARY_PATH", - (String) totalConf.get(Config.JAVA_LIBRARY_PATH)); + environment.put("LD_LIBRARY_PATH", (String) totalConf.get(Config.JAVA_LIBRARY_PATH)); StringBuilder commandSB = new StringBuilder(); try { if (this.cgroupManager != null) { - commandSB.append(cgroupManager.startNewWorker(totalConf, - cpuNum, workerId)); + commandSB.append(cgroupManager.startNewWorker(totalConf, cpuNum, workerId)); } } catch (Exception e) { LOG.error("fail to prepare cgroup to workerId: " + workerId, e); @@ -793,15 +733,21 @@ class SyncProcessEvent extends ShutdownWork { // commandSB.append("java -server -Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=n "); commandSB.append("java -server "); - commandSB.append(" -Xms" + memSize); + commandSB.append(" -Xms" + memMinSize); commandSB.append(" -Xmx" + memSize + " "); - commandSB.append(" -Xmn" + memSize / 3 + " "); - commandSB.append(" -XX:PermSize=" + memSize / 16); - commandSB.append(" -XX:MaxPermSize=" + memSize / 8); + if (memMinSize < (memSize / 2)) + commandSB.append(" -Xmn" + memMinSize + " "); + else + commandSB.append(" -Xmn" + memSize / 2 + " "); + if (memGsize >= 2) { + commandSB.append(" -XX:PermSize=" + memSize / 32); + } else { + commandSB.append(" -XX:PermSize=" + memSize / 16); + } + commandSB.append(" -XX:MaxPermSize=" + memSize / 16); commandSB.append(" -XX:ParallelGCThreads=" + gcThreadsNum); commandSB.append(" " + childopts); - commandSB.append(" " - + (assignment.getJvm() == null ? "" : assignment.getJvm())); + commandSB.append(" " + (assignment.getJvm() == null ? "" : assignment.getJvm())); commandSB.append(" -Djava.library.path="); commandSB.append((String) totalConf.get(Config.JAVA_LIBRARY_PATH)); @@ -811,20 +757,18 @@ class SyncProcessEvent extends ShutdownWork { commandSB.append(stormhome); } - commandSB.append(getLogParameter(totalConf, stormhome, - assignment.getTopologyName(), port)); + String logDir = System.getProperty("jstorm.log.dir"); + if (logDir != null) + commandSB.append(" -Djstorm.log.dir=").append(logDir); + commandSB.append(getLogParameter(totalConf, stormhome, assignment.getTopologyName(), port)); String classpath = getClassPath(stormjar, stormhome, totalConf); - String workerClassPath = - (String) totalConf.get(Config.TOPOLOGY_CLASSPATH); - List<String> otherLibs = - (List<String>) stormConf - .get(GenericOptionsParser.TOPOLOGY_LIB_NAME); + String workerClassPath = (String) totalConf.get(Config.TOPOLOGY_CLASSPATH); + List<String> otherLibs = (List<String>) stormConf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME); StringBuilder sb = new StringBuilder(); if (otherLibs != null) { for (String libName : otherLibs) { - sb.append(StormConfig.stormlib_path(stormroot, libName)) - .append(":"); + sb.append(StormConfig.stormlib_path(stormroot, libName)).append(":"); } } workerClassPath = workerClassPath + ":" + sb.toString(); @@ -832,8 +776,7 @@ class SyncProcessEvent extends ShutdownWork { Map<String, String> policyReplaceMap = new HashMap<String, String>(); String realClassPath = classpath + ":" + workerClassPath; policyReplaceMap.put(SandBoxMaker.CLASS_PATH_KEY, realClassPath); - commandSB - .append(sandBoxMaker.sandboxPolicy(workerId, policyReplaceMap)); + commandSB.append(sandBoxMaker.sandboxPolicy(workerId, policyReplaceMap)); commandSB.append(" -cp "); // commandSB.append(workerClassPath + ":"); @@ -871,9 +814,7 @@ class SyncProcessEvent extends ShutdownWork { JStormUtils.launch_process(cmd, environment, true); } - private Set<Integer> killUselessWorkers( - Map<String, StateHeartbeat> localWorkerStats, - Map<Integer, LocalAssignment> localAssignments, + private Set<Integer> killUselessWorkers(Map<String, StateHeartbeat> localWorkerStats, Map<Integer, LocalAssignment> localAssignments, Map<String, Integer> taskCleanupTimeoutMap) { Map<String, String> removed = new HashMap<String, String>(); Set<Integer> keepPorts = new HashSet<Integer>(); @@ -882,8 +823,7 @@ class SyncProcessEvent extends ShutdownWork { String workerid = entry.getKey(); StateHeartbeat hbstate = entry.getValue(); - if (workerIdToStartTimeAndPort.containsKey(workerid) - && hbstate.getState().equals(State.notStarted)) + if (workerIdToStartTimeAndPort.containsKey(workerid) && hbstate.getState().equals(State.notStarted)) continue; if (hbstate.getState().equals(State.valid)) { @@ -891,8 +831,7 @@ class SyncProcessEvent extends ShutdownWork { keepPorts.add(hbstate.getHeartbeat().getPort()); } else { if (hbstate.getHeartbeat() != null) { - removed.put(workerid, hbstate.getHeartbeat() - .getTopologyId()); + removed.put(workerid, hbstate.getHeartbeat().getTopologyId()); } else { removed.put(workerid, null); } @@ -910,14 +849,12 @@ class SyncProcessEvent extends ShutdownWork { } } - shutWorker(conf, supervisorId, removed, workerThreadPids, - cgroupManager, false, killingWorkers, taskCleanupTimeoutMap); + shutWorker(conf, supervisorId, removed, workerThreadPids, cgroupManager, false, killingWorkers, taskCleanupTimeoutMap); Set<String> activeTopologys = new HashSet<String>(); if (killingWorkers.size() == 0) { // When all workers under killing are killed successfully, // clean the task cleanup timeout map correspondingly. - for (Entry<Integer, LocalAssignment> entry : localAssignments - .entrySet()) { + for (Entry<Integer, LocalAssignment> entry : localAssignments.entrySet()) { activeTopologys.add(entry.getValue().getTopologyId()); } @@ -936,8 +873,7 @@ class SyncProcessEvent extends ShutdownWork { localWorkerStats.remove(removedWorkerId); } // Keep the workers which are still under starting - for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort - .entrySet()) { + for (Entry<String, Pair<Integer, Integer>> entry : workerIdToStartTimeAndPort.entrySet()) { String workerId = entry.getKey(); StateHeartbeat hbstate = localWorkerStats.get(workerId); if (hbstate != null) @@ -948,14 +884,12 @@ class SyncProcessEvent extends ShutdownWork { return keepPorts; } - private void startNewWorkers(Set<Integer> keepPorts, - Map<Integer, LocalAssignment> localAssignments) throws Exception { + private void startNewWorkers(Set<Integer> keepPorts, Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds) + throws Exception { /** - * Step 4: get reassigned tasks, which is in assignedTasks, but not in - * keeperPorts Map<port(type Integer), LocalAssignment> + * Step 4: get reassigned tasks, which is in assignedTasks, but not in keeperPorts Map<port(type Integer), LocalAssignment> */ - Map<Integer, LocalAssignment> newWorkers = - JStormUtils.select_keys_pred(keepPorts, localAssignments); + Map<Integer, LocalAssignment> newWorkers = JStormUtils.select_keys_pred(keepPorts, localAssignments); /** * Step 5: generate new work ids @@ -965,7 +899,10 @@ class SyncProcessEvent extends ShutdownWork { for (Entry<Integer, LocalAssignment> entry : newWorkers.entrySet()) { Integer port = entry.getKey(); LocalAssignment assignment = entry.getValue(); - + if (assignment != null && assignment.getTopologyId() != null && downloadFailedTopologyIds.contains(assignment.getTopologyId())) { + LOG.info("Can't start this worker: " + port + " about the topology: " + assignment.getTopologyId() + ", due to the damaged binary !!"); + continue; + } String workerId = UUID.randomUUID().toString(); newWorkerIds.put(port, workerId); @@ -994,18 +931,14 @@ class SyncProcessEvent extends ShutdownWork { String clusterMode = StormConfig.cluster_mode(conf); if (clusterMode.equals("distributed")) { - launchWorker(conf, sharedContext, - assignment.getTopologyId(), supervisorId, port, - workerId, assignment); + launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, assignment); } else if (clusterMode.equals("local")) { - launchWorker(conf, sharedContext, - assignment.getTopologyId(), supervisorId, port, - workerId, workerThreadPids); + launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, workerThreadPids); } } catch (Exception e) { - String errorMsg = - "Failed to launchWorker workerId:" + workerId + ":" - + port; + workerReportError.report(assignment.getTopologyId(), port, + assignment.getTaskIds(), new String(JStormUtils.getErrorInfo(e))); + String errorMsg = "Failed to launchWorker workerId:" + workerId + ":" + port; LOG.error(errorMsg, e); throw e; } @@ -1013,8 +946,7 @@ class SyncProcessEvent extends ShutdownWork { } /** - * FIXME, workerIds should be Set, not Collection, but here simplify the - * logic + * FIXME, workerIds should be Set, not Collection, but here simplify the logic */ markAllNewWorkers(newWorkerIds); // try { @@ -1027,6 +959,9 @@ class SyncProcessEvent extends ShutdownWork { } boolean isWorkerDead(String workerId) { + if (ConfigExtension.isCheckWorkerAliveBySystemInfo(conf) == false) { + return false; + } try { List<String> pids = getPid(conf, workerId); @@ -1046,9 +981,7 @@ class SyncProcessEvent extends ShutdownWork { return true; } catch (IOException e) { - LOG.info( - "Failed to check whether worker is dead through /proc/pid", - e); + LOG.info("Failed to check whether worker is dead through /proc/pid", e); return false; }
