http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java index 6000688..59e14d9 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormServerUtils.java @@ -43,12 +43,10 @@ import com.alibaba.jstorm.cluster.StormConfig; */ public class JStormServerUtils { - private static final Logger LOG = LoggerFactory - .getLogger(JStormServerUtils.class); + private static final Logger LOG = LoggerFactory.getLogger(JStormServerUtils.class); - public static void downloadCodeFromMaster(Map conf, String localRoot, - String masterCodeDir, String topologyId, boolean isSupervisor) - throws IOException, TException { + public static void downloadCodeFromMaster(Map conf, String localRoot, String masterCodeDir, String topologyId, boolean isSupervisor) throws IOException, + TException { FileUtils.forceMkdir(new File(localRoot)); FileUtils.forceMkdir(new File(StormConfig.stormlib_path(localRoot))); @@ -64,25 +62,18 @@ public class JStormServerUtils { String masterStormConfPath = StormConfig.stormconf_path(masterCodeDir); Utils.downloadFromMaster(conf, masterStormConfPath, localStormConfPath); - Map stormConf = - (Map) StormConfig.readLocalObject(topologyId, - localStormConfPath); + Map stormConf = (Map) StormConfig.readLocalObject(topologyId, localStormConfPath); if (stormConf == null) throw new IOException("Get topology conf error: " + topologyId); - List<String> libs = - (List<String>) stormConf - .get(GenericOptionsParser.TOPOLOGY_LIB_NAME); + List<String> libs = (List<String>) stormConf.get(GenericOptionsParser.TOPOLOGY_LIB_NAME); if (libs == null) return; for (String libName : libs) { - String localStromLibPath = - StormConfig.stormlib_path(localRoot, libName); - String masterStormLibPath = - StormConfig.stormlib_path(masterCodeDir, libName); - Utils.downloadFromMaster(conf, masterStormLibPath, - localStromLibPath); + String localStromLibPath = StormConfig.stormlib_path(localRoot, libName); + String masterStormLibPath = StormConfig.stormlib_path(masterCodeDir, libName); + Utils.downloadFromMaster(conf, masterStormLibPath, localStromLibPath); } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java index 983f579..ad56815 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/JStormUtils.java @@ -53,6 +53,7 @@ import org.apache.commons.exec.ExecuteException; import org.apache.commons.exec.ExecuteResultHandler; import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.lang.StringUtils; +import org.apache.thrift.TFieldIdEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,11 +68,9 @@ import com.alibaba.jstorm.client.ConfigExtension; * JStorm utility * * @author yannian/Longda/Xin.Zhou/Xin.Li - * */ public class JStormUtils { - private static final Logger LOG = LoggerFactory - .getLogger(JStormUtils.class); + private static final Logger LOG = LoggerFactory.getLogger(JStormUtils.class); public static long SIZE_1_K = 1024; public static long SIZE_1_M = SIZE_1_K * 1024; @@ -223,8 +222,7 @@ public class JStormUtils { } /** - * Gets the pid of this JVM, because Java doesn't provide a real way to do - * this. + * Gets the pid of this JVM, because Java doesn't provide a real way to do this. * * @return */ @@ -238,8 +236,7 @@ public class JStormUtils { return split[0]; } - public static void exec_command(String command) throws ExecuteException, - IOException { + public static void exec_command(String command) throws ExecuteException, IOException { String[] cmdlist = command.split(" "); CommandLine cmd = new CommandLine(cmdlist[0]); for (int i = 1; i < cmdlist.length; i++) { @@ -257,14 +254,12 @@ public class JStormUtils { * @param dir * @param destdir */ - public static void extract_dir_from_jar(String jarpath, String dir, - String destdir) { + public static void extract_dir_from_jar(String jarpath, String dir, String destdir) { String cmd = "unzip -qq " + jarpath + " " + dir + "/** -d " + destdir; try { exec_command(cmd); } catch (Exception e) { - LOG.warn("No " + dir + " from " + jarpath + " by cmd:" + cmd - + "!\n" + e.getMessage()); + LOG.warn("No " + dir + " from " + jarpath + " by cmd:" + cmd + "!\n" + e.getMessage()); } } @@ -278,8 +273,7 @@ public class JStormUtils { LOG.info("kill -9 process " + pid); sleepMs(100); } catch (ExecuteException e) { - LOG.info("Error when trying to kill " + pid - + ". Process has been killed"); + LOG.info("Error when trying to kill " + pid + ". Process has been killed"); } catch (Exception e) { LOG.info("Error when trying to kill " + pid + ".Exception ", e); } @@ -291,8 +285,7 @@ public class JStormUtils { exec_command("kill " + pid); LOG.info("kill process " + pid); } catch (ExecuteException e) { - LOG.info("Error when trying to kill " + pid - + ". Process has been killed. "); + LOG.info("Error when trying to kill " + pid + ". Process has been killed. "); } catch (Exception e) { LOG.info("Error when trying to kill " + pid + ".Exception ", e); } @@ -349,7 +342,8 @@ public class JStormUtils { String output = null; try { String pid = JStormUtils.process_pid(); - output = SystemOperation.exec("top -b -n 1 | grep " + pid); + String command = String.format("top -b -n 1 -p %s | grep -w %s", pid, pid); + output = SystemOperation.exec(command); String subStr = output.substring(output.indexOf("S") + 1); for (int i = 0; i < subStr.length(); i++) { char ch = subStr.charAt(i); @@ -369,64 +363,89 @@ public class JStormUtils { return value; } - + + public static Double getDiskUsage() { + if (!OSInfo.isLinux() && !OSInfo.isMac()) { + return 0.0; + } + try { + String output = SystemOperation.exec("df -h /"); + if (output != null) { + String[] lines = output.split("[\\r\\n]+"); + if (lines.length >= 2) { + String[] parts = lines[1].split("\\s+"); + if (parts.length >= 5) { + String pct = parts[4]; + if (pct.endsWith("%")) { + return Integer.valueOf(pct.substring(0, pct.length() - 1)) / 100.0; + } + } + } + } + } catch (Exception e) { + LOG.warn("failed to get disk usage:", e); + } + return 0.0; + } + public static Double getMemUsage() { - if (OSInfo.isLinux() == true) { - try { - Double value = 0.0; + if (OSInfo.isLinux() == true) { + try { + Double value = 0.0; String pid = JStormUtils.process_pid(); - String output = SystemOperation.exec("top -b -n 1 | grep " + pid); - - int m = 0; - String[] strArray = output.split(" "); - for (int i = 0; i < strArray.length; i++) { - String info = strArray[i]; - if (info.trim().length() == 0){ - continue; - } - if(m == 5) { - // memory - String unit = info.substring(info.length() - 1); - - if(unit.equalsIgnoreCase("g")) { - value = Double.parseDouble(info.substring(0, info.length() - 1)); + String command = String.format("top -b -n 1 -p %s | grep -w %s", pid, pid); + String output = SystemOperation.exec(command); + + int m = 0; + String[] strArray = output.split(" "); + for (int i = 0; i < strArray.length; i++) { + String info = strArray[i]; + if (info.trim().length() == 0) { + continue; + } + if (m == 5) { + // memory + String unit = info.substring(info.length() - 1); + + if (unit.equalsIgnoreCase("g")) { + value = Double.parseDouble(info.substring(0, info.length() - 1)); value *= 1000000000; - } else if(unit.equalsIgnoreCase("m")) { - value = Double.parseDouble(info.substring(0, info.length() - 1)); - value *= 1000000; - } else { - value = Double.parseDouble(info); - } + } else if (unit.equalsIgnoreCase("m")) { + value = Double.parseDouble(info.substring(0, info.length() - 1)); + value *= 1000000; + } else { + value = Double.parseDouble(info); + } + + //LOG.info("!!!! Get Memory Size:{}, info:{}", value, info); return value; - } - if(m == 8) { - // cpu usage - - } - if(m == 9) { - // memory ratio - - } - m++; - } + } + if (m == 8) { + // cpu usage + + } + if (m == 9) { + // memory ratio + + } + m++; + } } catch (Exception e) { LOG.warn("Failed to get memory usage ."); } - } - - // this will be incorrect - MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + } + + // this will be incorrect + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage(); return Double.valueOf(memoryUsage.getUsed()); } /** - * If it is backend, please set resultHandler, such as - * DefaultExecuteResultHandler If it is frontend, - * ByteArrayOutputStream.toString get the result - * + * If it is backend, please set resultHandler, such as DefaultExecuteResultHandler If it is frontend, ByteArrayOutputStream.toString get the result + * <p/> * This function don't care whether the command is successfully or not * * @param command @@ -436,9 +455,8 @@ public class JStormUtils { * @return * @throws IOException */ - public static ByteArrayOutputStream launchProcess(String command, - final Map environment, final String workDir, - ExecuteResultHandler resultHandler) throws IOException { + public static ByteArrayOutputStream launchProcess(String command, final Map environment, final String workDir, ExecuteResultHandler resultHandler) + throws IOException { String[] cmdlist = command.split(" "); @@ -479,8 +497,7 @@ public class JStormUtils { } - protected static java.lang.Process launchProcess(final String[] cmdlist, - final Map<String, String> environment) throws IOException { + protected static Process launchProcess(final String[] cmdlist, final Map<String, String> environment) throws IOException { ArrayList<String> buff = new ArrayList<String>(); for (String tok : cmdlist) { if (!tok.isEmpty()) { @@ -499,33 +516,26 @@ public class JStormUtils { } /** - * @@@ it should use DefaultExecutor to start a process, but some little - * problem have been found, such as exitCode/output string so still use - * the old method to start process - * * @param command * @param environment * @param backend * @return * @throws IOException + * @@@ it should use DefaultExecutor to start a process, but some little problem have been found, such as exitCode/output string so still use the old method + * to start process */ - public static java.lang.Process launch_process(final String command, - final Map<String, String> environment, boolean backend) - throws IOException { + public static Process launch_process(final String command, final Map<String, String> environment, boolean backend) throws IOException { if (backend == true) { new Thread(new Runnable() { @Override public void run() { - String[] cmdlist = - (new String("nohup " + command + " &")).split(" "); + String[] cmdlist = (new String("nohup " + command + " &")).split(" "); try { launchProcess(cmdlist, environment); } catch (IOException e) { - LOG.error( - "Failed to run " + command + ":" + e.getCause(), - e); + LOG.error("Failed to run " + command + ":" + e.getCause(), e); } } }).start(); @@ -568,9 +578,8 @@ public class JStormUtils { } /** - * * if the list exist repeat string, return the repeated string - * + * <p/> * this function will be used to check wheter bolt or spout exist same id * * @param sets @@ -629,7 +638,7 @@ public class JStormUtils { return rtn; } - public static <T> Long bit_xor_vals(java.util.List<T> vals) { + public static <T> Long bit_xor_vals(List<T> vals) { Long rtn = 0l; for (T n : vals) { rtn = bit_xor(rtn, n); @@ -638,7 +647,7 @@ public class JStormUtils { return rtn; } - public static <T> Long bit_xor_vals_sets(java.util.Set<T> vals) { + public static <T> Long bit_xor_vals_sets(Set<T> vals) { Long rtn = 0l; for (T n : vals) { rtn = bit_xor(rtn, n); @@ -675,7 +684,7 @@ public class JStormUtils { return rtn; } - public static <V> List<V> mk_list(java.util.Set<V> args) { + public static <V> List<V> mk_list(Set<V> args) { ArrayList<V> rtn = new ArrayList<V>(); if (args != null) { for (V o : args) { @@ -712,8 +721,7 @@ public class JStormUtils { } else if (o instanceof Long) { return (Long) o; } else { - throw new RuntimeException("Invalid value " - + o.getClass().getName() + " " + o); + throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o); } } @@ -733,8 +741,18 @@ public class JStormUtils { } else if (o instanceof Double) { return (Double) o; } else { - throw new RuntimeException("Invalid value " - + o.getClass().getName() + " " + o); + throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o); + } + } + + public static Double parseDouble(Object o, double defaultValue) { + if (o == null) { + return defaultValue; + } + try { + return parseDouble(o); + } catch (Exception ignored) { + return defaultValue; } } @@ -769,8 +787,7 @@ public class JStormUtils { } else if (o instanceof Integer) { return (Integer) o; } else { - throw new RuntimeException("Invalid value " - + o.getClass().getName() + " " + o); + throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o); } } @@ -791,6 +808,20 @@ public class JStormUtils { } } + public static Boolean parseBoolean(Object o) { + if (o == null) { + return null; + } + + if (o instanceof String) { + return Boolean.valueOf((String) o); + } else if (o instanceof Boolean) { + return (Boolean) o; + } else { + throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o); + } + } + public static boolean parseBoolean(Object o, boolean defaultValue) { if (o == null) { return defaultValue; @@ -863,8 +894,7 @@ public class JStormUtils { } else if (oldValue instanceof BigInteger) { return ((BigInteger) oldValue).add((BigInteger) newValue); } else if (oldValue instanceof Number) { - return ((Number) oldValue).doubleValue() - + ((Number) newValue).doubleValue(); + return ((Number) oldValue).doubleValue() + ((Number) newValue).doubleValue(); } else { return null; } @@ -933,8 +963,7 @@ public class JStormUtils { public static String formatSimpleDouble(Double value) { try { - java.text.DecimalFormat form = - new java.text.DecimalFormat("##0.000"); + java.text.DecimalFormat form = new java.text.DecimalFormat("##0.000"); String s = form.format(value); return s; } catch (Exception e) { @@ -955,8 +984,7 @@ public class JStormUtils { public static double formatDoubleDecPoint4(Double value) { try { - java.text.DecimalFormat form = - new java.text.DecimalFormat("###.0000"); + java.text.DecimalFormat form = new java.text.DecimalFormat("###.0000"); String s = form.format(value); return Double.valueOf(s); } catch (Exception e) { @@ -1041,18 +1069,13 @@ public class JStormUtils { } /** - * @@@ Todo - * * @return + * @@@ Todo */ public static Long getPhysicMemorySize() { Object object; try { - object = - ManagementFactory.getPlatformMBeanServer().getAttribute( - new ObjectName("java.lang", "type", - "OperatingSystem"), - "TotalPhysicalMemorySize"); + object = ManagementFactory.getPlatformMBeanServer().getAttribute(new ObjectName("java.lang", "type", "OperatingSystem"), "TotalPhysicalMemorySize"); } catch (Exception e) { LOG.warn("Failed to get system physical memory size,", e); return null; @@ -1089,19 +1112,15 @@ public class JStormUtils { public static String getLogFileName() { try { - Logger rootLogger = - LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); + Logger rootLogger = LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); if (rootLogger instanceof ch.qos.logback.classic.Logger) { - ch.qos.logback.classic.Logger logbackLogger = - (ch.qos.logback.classic.Logger) rootLogger; + ch.qos.logback.classic.Logger logbackLogger = (ch.qos.logback.classic.Logger) rootLogger; // Logger framework is Logback - for (Iterator<ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent>> index = - logbackLogger.iteratorForAppenders(); index.hasNext();) { - ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent> appender = - index.next(); + for (Iterator<ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent>> index = logbackLogger.iteratorForAppenders(); index + .hasNext();) { + ch.qos.logback.core.Appender<ch.qos.logback.classic.spi.ILoggingEvent> appender = index.next(); if (appender instanceof ch.qos.logback.core.FileAppender) { - ch.qos.logback.core.FileAppender fileAppender = - (ch.qos.logback.core.FileAppender) appender; + ch.qos.logback.core.FileAppender fileAppender = (ch.qos.logback.core.FileAppender) appender; return fileAppender.getFile(); } } @@ -1156,8 +1175,7 @@ public class JStormUtils { FileOutputStream workerOut = new FileOutputStream(new File(file)); - PrintStream ps = - new PrintStream(new BufferedOutputStream(workerOut), true); + PrintStream ps = new PrintStream(new BufferedOutputStream(workerOut), true); System.setOut(ps); System.setErr(ps); @@ -1170,13 +1188,11 @@ public class JStormUtils { return new AsyncLoopDefaultKill(); } - public static TreeMap<Integer, Integer> integer_divided(int sum, - int num_pieces) { + public static TreeMap<Integer, Integer> integer_divided(int sum, int num_pieces) { return Utils.integerDivided(sum, num_pieces); } - public static <K, V> HashMap<K, V> filter_val(RunnableCallback fn, - Map<K, V> amap) { + public static <K, V> HashMap<K, V> filter_val(RunnableCallback fn, Map<K, V> amap) { HashMap<K, V> rtn = new HashMap<K, V>(); for (Entry<K, V> entry : amap.entrySet()) { @@ -1191,16 +1207,14 @@ public class JStormUtils { } public static List<Integer> getSupervisorPortList(Map conf) { - List<Integer> portList = - (List<Integer>) conf.get(Config.SUPERVISOR_SLOTS_PORTS); + List<Integer> portList = (List<Integer>) conf.get(Config.SUPERVISOR_SLOTS_PORTS); if (portList != null && portList.size() > 0) { return portList; } LOG.info("Generate port list through CPU cores and system memory size"); - double cpuWeight = - ConfigExtension.getSupervisorSlotsPortCpuWeight(conf); + double cpuWeight = ConfigExtension.getSupervisorSlotsPortCpuWeight(conf); int sysCpuNum = 4; try { sysCpuNum = Runtime.getRuntime().availableProcessors(); @@ -1211,11 +1225,11 @@ public class JStormUtils { int cpuPortNum = (int) (sysCpuNum / cpuWeight); if (cpuPortNum < 1) { - LOG.info("Invalid supervisor.slots.port.cpu.weight setting :" - + cpuWeight + ", cpu cores:" + sysCpuNum); + LOG.info("Invalid supervisor.slots.port.cpu.weight setting :" + cpuWeight + ", cpu cores:" + sysCpuNum); cpuPortNum = 1; } + Double memWeight = ConfigExtension.getSupervisorSlotsPortMemWeight(conf); int memPortNum = Integer.MAX_VALUE; Long physicalMemSize = JStormUtils.getPhysicMemorySize(); if (physicalMemSize == null) { @@ -1223,7 +1237,7 @@ public class JStormUtils { } else { LOG.info("Get system memory size :" + physicalMemSize); long workerMemSize = ConfigExtension.getMemSizePerWorker(conf); - memPortNum = (int) (physicalMemSize / workerMemSize); + memPortNum = (int) (physicalMemSize / (workerMemSize * memWeight)); if (memPortNum < 1) { LOG.info("Invalide worker.memory.size setting:" + workerMemSize); memPortNum = 4; @@ -1261,14 +1275,10 @@ public class JStormUtils { } public static Object createDisruptorWaitStrategy(Map conf) { - String waitStrategy = - (String) conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY); + String waitStrategy = (String) conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_STRATEGY); Object ret; - - if (waitStrategy.indexOf("TimeoutBlockingWaitStrategy") != -1) { - long timeout = - parseLong(conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), - 10); + if (waitStrategy.contains("TimeoutBlockingWaitStrategy")) { + long timeout = parseLong(conf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10); ret = Utils.newInstance(waitStrategy, timeout, TimeUnit.MILLISECONDS); } else { ret = Utils.newInstance(waitStrategy); @@ -1276,4 +1286,64 @@ public class JStormUtils { return ret; } + + public static Object thriftToObject(Object obj) { + Object ret = null; + if (obj instanceof org.apache.thrift.TBase) { + ret = thriftToMap((org.apache.thrift.TBase)obj); + }else if (obj instanceof List) { + ret = new ArrayList<>(); + for (Object item : (List)obj) { + ((List)ret).add(thriftToObject(item)); + } + }else if (obj instanceof Map) { + ret = new HashMap<String, Object>(); + Set<Entry> entrySet = ((Map)obj).entrySet(); + for (Entry entry : entrySet) { + ((Map)ret).put(String.valueOf(entry.getKey()), thriftToObject(entry.getValue())); + } + }else { + + ret = String.valueOf(obj); + } + + return ret; + } + + public static Map<String, Object> thriftToMap( + org.apache.thrift.TBase thriftObj) { + Map<String, Object> ret = new HashMap<String, Object>(); + + int i = 1; + TFieldIdEnum field = thriftObj.fieldForId(i); + while(field != null) { + if (thriftObj.isSet(field)) { + Object obj = thriftObj.getFieldValue(field); + ret.put(field.getFieldName(), thriftToObject(obj)); + + } + field = thriftObj.fieldForId(++i); + } + + return ret; + } + + public static List<Map<String, Object>> thriftToMap(List thriftObjs) { + List<Map<String, Object> > ret = new ArrayList<Map<String, Object> > () ; + + for (Object thriftObj : thriftObjs) { + ret.add(thriftToMap((org.apache.thrift.TBase)thriftObj)); + } + + return ret; + } + + + public static long halfValueOfSum(long v1, long v2, boolean increment) { + long ret = (v1 + v2) / 2; + if (increment) { + ret += (v1 + v2) % 2; + } + return ret; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java index d082fcc..91be977 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/LoadConf.java @@ -21,121 +21,121 @@ import org.yaml.snakeyaml.Yaml; import org.yaml.snakeyaml.constructor.SafeConstructor; public class LoadConf { - private static final Logger LOG = LoggerFactory.getLogger(LoadConf.class); - - public static List<URL> findResources(String name) { - try { - Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name); - List<URL> ret = new ArrayList<URL>(); - while (resources.hasMoreElements()) { - ret.add(resources.nextElement()); - } - return ret; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * - * @param name - * @param mustExist -- if this is true, the file must exist, otherwise throw exception - * @param canMultiple -- if this is false and there is multiple conf, it will throw exception - * @return - */ - public static Map findAndReadYaml(String name, boolean mustExist, boolean canMultiple) { - InputStream in = null; - boolean confFileEmpty = false; - try { - in = getConfigFileInputStream(name, canMultiple); - if (null != in) { - Yaml yaml = new Yaml(new SafeConstructor()); - Map ret = (Map) yaml.load(new InputStreamReader(in)); - if (null != ret) { - return new HashMap(ret); - } else { - confFileEmpty = true; - } - } - - if (mustExist) { - if (confFileEmpty) - throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs"); - else - throw new RuntimeException("Could not find config file on classpath " + name); - } else { - return new HashMap(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - if (null != in) { - try { - in.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - } - - public static InputStream getConfigFileInputStream(String configFilePath, boolean canMultiple) throws IOException { - if (null == configFilePath) { - throw new IOException("Could not find config file, name not specified"); - } - - HashSet<URL> resources = new HashSet<URL>(findResources(configFilePath)); - if (resources.isEmpty()) { - File configFile = new File(configFilePath); - if (configFile.exists()) { - return new FileInputStream(configFile); - } - } else if (resources.size() > 1 && canMultiple == false) { - throw new IOException("Found multiple " + configFilePath - + " resources. You're probably bundling the Storm jars with your topology jar. " + resources); - } else { - LOG.info("Using " + configFilePath + " from resources"); - URL resource = resources.iterator().next(); - return resource.openStream(); - } - return null; - } - - public static InputStream getConfigFileInputStream(String configFilePath) throws IOException { - return getConfigFileInputStream(configFilePath, true); - } - - public static Map LoadYaml(String confPath) { - - return findAndReadYaml(confPath, true, true); - - } - - public static Map LoadProperty(String prop) { - - InputStream in = null; - Properties properties = new Properties(); - - try { - in = getConfigFileInputStream(prop); - properties.load(in); - } catch (FileNotFoundException e) { - throw new RuntimeException("No such file " + prop); - } catch (Exception e1) { - throw new RuntimeException("Failed to read config file"); - } finally { - if (null != in) { - try { - in.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - } - - Map ret = new HashMap(); - ret.putAll(properties); - return ret; - } + private static final Logger LOG = LoggerFactory.getLogger(LoadConf.class); + + public static List<URL> findResources(String name) { + try { + Enumeration<URL> resources = Thread.currentThread().getContextClassLoader().getResources(name); + List<URL> ret = new ArrayList<URL>(); + while (resources.hasMoreElements()) { + ret.add(resources.nextElement()); + } + return ret; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * + * @param name + * @param mustExist -- if this is true, the file must exist, otherwise throw exception + * @param canMultiple -- if this is false and there is multiple conf, it will throw exception + * @return + */ + public static Map findAndReadYaml(String name, boolean mustExist, boolean canMultiple) { + InputStream in = null; + boolean confFileEmpty = false; + try { + in = getConfigFileInputStream(name, canMultiple); + if (null != in) { + Yaml yaml = new Yaml(new SafeConstructor()); + Map ret = (Map) yaml.load(new InputStreamReader(in)); + if (null != ret) { + return new HashMap(ret); + } else { + confFileEmpty = true; + } + } + + if (mustExist) { + if (confFileEmpty) + throw new RuntimeException("Config file " + name + " doesn't have any valid storm configs"); + else + throw new RuntimeException("Could not find config file on classpath " + name); + } else { + return new HashMap(); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (null != in) { + try { + in.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + public static InputStream getConfigFileInputStream(String configFilePath, boolean canMultiple) throws IOException { + if (null == configFilePath) { + throw new IOException("Could not find config file, name not specified"); + } + + HashSet<URL> resources = new HashSet<URL>(findResources(configFilePath)); + if (resources.isEmpty()) { + File configFile = new File(configFilePath); + if (configFile.exists()) { + return new FileInputStream(configFile); + } + } else if (resources.size() > 1 && canMultiple == false) { + throw new IOException("Found multiple " + configFilePath + " resources. You're probably bundling the Storm jars with your topology jar. " + + resources); + } else { + LOG.info("Using " + configFilePath + " from resources"); + URL resource = resources.iterator().next(); + return resource.openStream(); + } + return null; + } + + public static InputStream getConfigFileInputStream(String configFilePath) throws IOException { + return getConfigFileInputStream(configFilePath, true); + } + + public static Map LoadYaml(String confPath) { + + return findAndReadYaml(confPath, true, true); + + } + + public static Map LoadProperty(String prop) { + + InputStream in = null; + Properties properties = new Properties(); + + try { + in = getConfigFileInputStream(prop); + properties.load(in); + } catch (FileNotFoundException e) { + throw new RuntimeException("No such file " + prop); + } catch (Exception e1) { + throw new RuntimeException("Failed to read config file"); + } finally { + if (null != in) { + try { + in.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + Map ret = new HashMap(); + ret.putAll(properties); + return ret; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java index 8bca599..e005f38 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/NetWorkUtils.java @@ -103,41 +103,39 @@ public class NetWorkUtils { try { address = InetAddress.getByName(host); } catch (UnknownHostException e) { - LOG.warn("NetWorkUtil can't transfer hostname(" + host - + ") to ip, return hostname", e); + LOG.warn("NetWorkUtil can't transfer hostname(" + host + ") to ip, return hostname", e); return host; } return address.getHostAddress(); } - - public static List<String> host2Ip(List<String> servers) { - if (servers == null || servers.size() == 0) { - return new ArrayList<String>(); - } - - Set<String> ret = new HashSet<String>(); - for (String server : servers) { - if (StringUtils.isBlank(server)) { - continue; - } - - InetAddress ia; - try { - ia = InetAddress.getByName(server); - } catch (UnknownHostException e) { - // TODO Auto-generated catch block - LOG.info("Fail to get address of ", server); - continue; - } - if (ia.isLoopbackAddress() || ia.isAnyLocalAddress()) { - ret.add(NetWorkUtils.ip()); - }else { - ret.add(ia.getHostAddress()); - } - } - - - return JStormUtils.mk_list(ret); + + public static List<String> host2Ip(List<String> servers) { + if (servers == null || servers.size() == 0) { + return new ArrayList<String>(); + } + + Set<String> ret = new HashSet<String>(); + for (String server : servers) { + if (StringUtils.isBlank(server)) { + continue; + } + + InetAddress ia; + try { + ia = InetAddress.getByName(server); + } catch (UnknownHostException e) { + // TODO Auto-generated catch block + LOG.info("Fail to get address of ", server); + continue; + } + if (ia.isLoopbackAddress() || ia.isAnyLocalAddress()) { + ret.add(NetWorkUtils.ip()); + } else { + ret.add(ia.getHostAddress()); + } + } + + return JStormUtils.mk_list(ret); } public static String ip2Host(String ip) { @@ -145,8 +143,7 @@ public class NetWorkUtils { try { address = InetAddress.getByName(ip); } catch (UnknownHostException e) { - LOG.warn("NetWorkUtil can't transfer ip(" + ip - + ") to hostname, return ip", e); + LOG.warn("NetWorkUtil can't transfer ip(" + ip + ") to hostname, return ip", e); return ip; } return address.getHostName(); @@ -168,13 +165,13 @@ public class NetWorkUtils { return StringUtils.equalsIgnoreCase(ip1, ip2); } - + public static void main(String[] args) { - List<String> servers = new ArrayList<String>(); - servers.add("localhost"); - - System.out.println(host2Ip(servers)); - + List<String> servers = new ArrayList<String>(); + servers.add("localhost"); + + System.out.println(host2Ip(servers)); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java index d4f6e0f..f5acda7 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OSInfo.java @@ -17,141 +17,144 @@ */ package com.alibaba.jstorm.utils; -public class OSInfo { - - private static String OS = System.getProperty("os.name").toLowerCase(); - - private static OSInfo _instance = new OSInfo(); - - private EPlatform platform; - - private OSInfo(){} - - public static boolean isLinux(){ - return OS.indexOf("linux")>=0; - } - - public static boolean isMacOS(){ - return OS.indexOf("mac")>=0&&OS.indexOf("os")>0&&OS.indexOf("x")<0; - } - - public static boolean isMacOSX(){ - return OS.indexOf("mac")>=0&&OS.indexOf("os")>0&&OS.indexOf("x")>0; - } - +public class OSInfo { + + private static String OS = System.getProperty("os.name").toLowerCase(); + + private static OSInfo _instance = new OSInfo(); + + private EPlatform platform; + + private OSInfo() { + } + + public static boolean isLinux() { + return OS.indexOf("linux") >= 0; + } + + public static boolean isMacOS() { + return OS.indexOf("mac") >= 0 && OS.indexOf("os") > 0 && OS.indexOf("x") < 0; + } + + public static boolean isMacOSX() { + return OS.indexOf("mac") >= 0 && OS.indexOf("os") > 0 && OS.indexOf("x") > 0; + } + public static boolean isMac() { - return OS.indexOf("mac")>=0&&OS.indexOf("os")>0; - } - - public static boolean isWindows(){ - return OS.indexOf("windows")>=0; - } - - public static boolean isOS2(){ - return OS.indexOf("os/2")>=0; - } - - public static boolean isSolaris(){ - return OS.indexOf("solaris")>=0; - } - - public static boolean isSunOS(){ - return OS.indexOf("sunos")>=0; - } - - public static boolean isMPEiX(){ - return OS.indexOf("mpe/ix")>=0; - } - - public static boolean isHPUX(){ - return OS.indexOf("hp-ux")>=0; - } - - public static boolean isAix(){ - return OS.indexOf("aix")>=0; - } - - public static boolean isOS390(){ - return OS.indexOf("os/390")>=0; - } - - public static boolean isFreeBSD(){ - return OS.indexOf("freebsd")>=0; - } - - public static boolean isIrix(){ - return OS.indexOf("irix")>=0; - } - - public static boolean isDigitalUnix(){ - return OS.indexOf("digital")>=0&&OS.indexOf("unix")>0; - } - - public static boolean isNetWare(){ - return OS.indexOf("netware")>=0; - } - - public static boolean isOSF1(){ - return OS.indexOf("osf1")>=0; - } - - public static boolean isOpenVMS(){ - return OS.indexOf("openvms")>=0; - } - - /** - * Get OS name - * @return OS name - */ - public static EPlatform getOSname(){ - if(isAix()){ - _instance.platform = EPlatform.AIX; - }else if (isDigitalUnix()) { - _instance.platform = EPlatform.Digital_Unix; - }else if (isFreeBSD()) { - _instance.platform = EPlatform.FreeBSD; - }else if (isHPUX()) { - _instance.platform = EPlatform.HP_UX; - }else if (isIrix()) { - _instance.platform = EPlatform.Irix; - }else if (isLinux()) { - _instance.platform = EPlatform.Linux; - }else if (isMacOS()) { - _instance.platform = EPlatform.Mac_OS; - }else if (isMacOSX()) { - _instance.platform = EPlatform.Mac_OS_X; - }else if (isMPEiX()) { - _instance.platform = EPlatform.MPEiX; - }else if (isNetWare()) { - _instance.platform = EPlatform.NetWare_411; - }else if (isOpenVMS()) { - _instance.platform = EPlatform.OpenVMS; - }else if (isOS2()) { - _instance.platform = EPlatform.OS2; - }else if (isOS390()) { - _instance.platform = EPlatform.OS390; - }else if (isOSF1()) { - _instance.platform = EPlatform.OSF1; - }else if (isSolaris()) { - _instance.platform = EPlatform.Solaris; - }else if (isSunOS()) { - _instance.platform = EPlatform.SunOS; - }else if (isWindows()) { - _instance.platform = EPlatform.Windows; - }else{ - _instance.platform = EPlatform.Others; - } - return _instance.platform; - } - /** - * @param args - */ - public static void main(String[] args) { - System.out.println( System.getProperty("os.name") ); - System.out.println( System.getProperty("os.version") ); - System.out.println( System.getProperty("os.arch") ); - - System.out.println(OSInfo.getOSname()); - } - -} + return OS.indexOf("mac") >= 0 && OS.indexOf("os") > 0; + } + + public static boolean isWindows() { + return OS.indexOf("windows") >= 0; + } + + public static boolean isOS2() { + return OS.indexOf("os/2") >= 0; + } + + public static boolean isSolaris() { + return OS.indexOf("solaris") >= 0; + } + + public static boolean isSunOS() { + return OS.indexOf("sunos") >= 0; + } + + public static boolean isMPEiX() { + return OS.indexOf("mpe/ix") >= 0; + } + + public static boolean isHPUX() { + return OS.indexOf("hp-ux") >= 0; + } + + public static boolean isAix() { + return OS.indexOf("aix") >= 0; + } + + public static boolean isOS390() { + return OS.indexOf("os/390") >= 0; + } + + public static boolean isFreeBSD() { + return OS.indexOf("freebsd") >= 0; + } + + public static boolean isIrix() { + return OS.indexOf("irix") >= 0; + } + + public static boolean isDigitalUnix() { + return OS.indexOf("digital") >= 0 && OS.indexOf("unix") > 0; + } + + public static boolean isNetWare() { + return OS.indexOf("netware") >= 0; + } + + public static boolean isOSF1() { + return OS.indexOf("osf1") >= 0; + } + + public static boolean isOpenVMS() { + return OS.indexOf("openvms") >= 0; + } + + /** + * Get OS name + * + * @return OS name + */ + public static EPlatform getOSname() { + if (isAix()) { + _instance.platform = EPlatform.AIX; + } else if (isDigitalUnix()) { + _instance.platform = EPlatform.Digital_Unix; + } else if (isFreeBSD()) { + _instance.platform = EPlatform.FreeBSD; + } else if (isHPUX()) { + _instance.platform = EPlatform.HP_UX; + } else if (isIrix()) { + _instance.platform = EPlatform.Irix; + } else if (isLinux()) { + _instance.platform = EPlatform.Linux; + } else if (isMacOS()) { + _instance.platform = EPlatform.Mac_OS; + } else if (isMacOSX()) { + _instance.platform = EPlatform.Mac_OS_X; + } else if (isMPEiX()) { + _instance.platform = EPlatform.MPEiX; + } else if (isNetWare()) { + _instance.platform = EPlatform.NetWare_411; + } else if (isOpenVMS()) { + _instance.platform = EPlatform.OpenVMS; + } else if (isOS2()) { + _instance.platform = EPlatform.OS2; + } else if (isOS390()) { + _instance.platform = EPlatform.OS390; + } else if (isOSF1()) { + _instance.platform = EPlatform.OSF1; + } else if (isSolaris()) { + _instance.platform = EPlatform.Solaris; + } else if (isSunOS()) { + _instance.platform = EPlatform.SunOS; + } else if (isWindows()) { + _instance.platform = EPlatform.Windows; + } else { + _instance.platform = EPlatform.Others; + } + return _instance.platform; + } + + /** + * @param args + */ + public static void main(String[] args) { + System.out.println(System.getProperty("os.name")); + System.out.println(System.getProperty("os.version")); + System.out.println(System.getProperty("os.arch")); + + System.out.println(OSInfo.getOSname()); + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java index 13b1d98..30c2726 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/OlderFileFilter.java @@ -39,8 +39,25 @@ public class OlderFileFilter implements FileFilter { long current_time = System.currentTimeMillis(); - return (pathname.isFile() && (pathname.lastModified() + seconds * 1000 <= current_time)) - || pathname.isDirectory(); + return (pathname.lastModified() + seconds * 1000 <= current_time) ; + } + + + public static void main(String[] args) { + long current_time = System.currentTimeMillis(); + String test = "test"; + + + File file = new File(test); + file.delete(); + file.mkdir(); + file.setLastModified(current_time); + + JStormUtils.sleepMs(10 * 1000); + + File newFile = new File(test); + System.out.println("modify time: " + newFile.lastModified() + ", raw:" + current_time); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java index 49d35d6..1bc8b56 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Pair.java @@ -43,11 +43,11 @@ public class Pair<F, S> { } @Override - public String toString(){ + public String toString() { StringBuilder sb = new StringBuilder(); - sb.append("first:"+ first); + sb.append("first:" + first); sb.append(":"); - sb.append("sencond:"+ second); + sb.append("sencond:" + second); return sb.toString(); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java index 939b81b..b3732dc 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/PathUtils.java @@ -44,7 +44,7 @@ public class PathUtils { */ public static List<String> tokenize_path(String path) { String[] toks = path.split(SEPERATOR); - java.util.ArrayList<String> rtn = new ArrayList<String>(); + ArrayList<String> rtn = new ArrayList<String>(); for (String str : toks) { if (!str.isEmpty()) { rtn.add(str); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java index e3be73f..20b9535 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RandomRange.java @@ -20,8 +20,7 @@ package com.alibaba.jstorm.utils; import java.util.ArrayList; /** - * Shuffle the Range, This class is used in shuffle grouping, it is better than - * random, which can't make sure balance. + * Shuffle the Range, This class is used in shuffle grouping, it is better than random, which can't make sure balance. * * @author yannian * http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java index 454e987..877e1d6 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/RotatingMap.java @@ -28,9 +28,8 @@ import java.util.concurrent.LinkedBlockingDeque; /** * RotatingMap must be used under thread-safe environment * - * Expires keys that have not been updated in the configured number of seconds. - * The algorithm used will take between expirationSecs and expirationSecs * (1 + - * 1 / (numBuckets-1)) to actually expire the message. + * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1 + * / (numBuckets-1)) to actually expire the message. * * get, put, remove, containsKey, and size take O(numBuckets) time to run. * @@ -45,8 +44,7 @@ public class RotatingMap<K, V> implements TimeOutMap<K, V> { private final Object lock = new Object(); - public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback, - boolean isSingleThread) { + public RotatingMap(int numBuckets, ExpiredCallback<K, V> callback, boolean isSingleThread) { if (numBuckets < 2) { throw new IllegalArgumentException("numBuckets must be >= 2"); } @@ -121,8 +119,7 @@ public class RotatingMap<K, V> implements TimeOutMap<K, V> { /** * Remove item from Rotate * - * On the side of performance, scanning from header is faster On the side of - * logic, it should scan from the end to first. + * On the side of performance, scanning from header is faster On the side of logic, it should scan from the end to first. * * @param key * @return http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java index ba7547b..5bc2252 100644 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/SystemOperation.java @@ -25,20 +25,16 @@ import org.slf4j.LoggerFactory; public class SystemOperation { - public static final Logger LOG = LoggerFactory - .getLogger(SystemOperation.class); + public static final Logger LOG = LoggerFactory.getLogger(SystemOperation.class); public static boolean isRoot() throws IOException { String result = SystemOperation.exec("echo $EUID").substring(0, 1); - return Integer.valueOf(result.substring(0, result.length())).intValue() == 0 ? true - : false; + return Integer.valueOf(result.substring(0, result.length())).intValue() == 0 ? true : false; }; - public static void mount(String name, String target, String type, - String data) throws IOException { + public static void mount(String name, String target, String type, String data) throws IOException { StringBuilder sb = new StringBuilder(); - sb.append("mount -t ").append(type).append(" -o ").append(data) - .append(" ").append(name).append(" ").append(target); + sb.append("mount -t ").append(type).append(" -o ").append(data).append(" ").append(name).append(" ").append(target); SystemOperation.exec(sb.toString()); } @@ -50,9 +46,7 @@ public class SystemOperation { public static String exec(String cmd) throws IOException { LOG.debug("Shell cmd: " + cmd); - Process process = - new ProcessBuilder(new String[] { "/bin/bash", "-c", cmd }) - .start(); + Process process = new ProcessBuilder(new String[] { "/bin/bash", "-c", cmd }).start(); try { process.waitFor(); String output = IOUtils.toString(process.getInputStream()); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java index c55751c..5116c4f 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/Thrift.java @@ -17,34 +17,22 @@ */ package com.alibaba.jstorm.utils; -import java.lang.reflect.Constructor; -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import backtype.storm.generated.Bolt; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.ComponentObject; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.generated.Grouping; -import backtype.storm.generated.JavaObject; -import backtype.storm.generated.JavaObjectArg; -import backtype.storm.generated.NullStruct; -import backtype.storm.generated.StormTopology; +import backtype.storm.generated.*; import backtype.storm.generated.StormTopology._Fields; -import backtype.storm.generated.StreamInfo; -import backtype.storm.generated.TopologyInitialStatus; import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.task.IBolt; import backtype.storm.utils.Utils; - import com.alibaba.jstorm.cluster.StormStatus; import com.alibaba.jstorm.daemon.nimbus.StatusType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * Thrift utils @@ -57,8 +45,7 @@ import com.alibaba.jstorm.daemon.nimbus.StatusType; public class Thrift { private static Logger LOG = LoggerFactory.getLogger(Thrift.class); - public static StormStatus topologyInitialStatusToStormStatus( - TopologyInitialStatus tStatus) { + public static StormStatus topologyInitialStatusToStormStatus(TopologyInitialStatus tStatus) { if (tStatus.equals(TopologyInitialStatus.ACTIVE)) { return new StormStatus(StatusType.active); } else { @@ -79,16 +66,13 @@ public class Thrift { paraTypes[i] = Integer.class; } else if (arg.getSetField().equals(JavaObjectArg._Fields.LONG_ARG)) { paraTypes[i] = Long.class; - } else if (arg.getSetField().equals( - JavaObjectArg._Fields.STRING_ARG)) { + } else if (arg.getSetField().equals(JavaObjectArg._Fields.STRING_ARG)) { paraTypes[i] = String.class; } else if (arg.getSetField().equals(JavaObjectArg._Fields.BOOL_ARG)) { paraTypes[i] = Boolean.class; - } else if (arg.getSetField().equals( - JavaObjectArg._Fields.BINARY_ARG)) { + } else if (arg.getSetField().equals(JavaObjectArg._Fields.BINARY_ARG)) { paraTypes[i] = ByteBuffer.class; - } else if (arg.getSetField().equals( - JavaObjectArg._Fields.DOUBLE_ARG)) { + } else if (arg.getSetField().equals(JavaObjectArg._Fields.DOUBLE_ARG)) { paraTypes[i] = Double.class; } else { paraTypes[i] = Object.class; @@ -113,8 +97,7 @@ public class Thrift { public static List<String> fieldGrouping(Grouping grouping) { if (!Grouping._Fields.FIELDS.equals(groupingType(grouping))) { - throw new IllegalArgumentException( - "Tried to get grouping fields from non fields grouping"); + throw new IllegalArgumentException("Tried to get grouping fields from non fields grouping"); } return grouping.get_fields(); @@ -152,9 +135,11 @@ public class Thrift { return Grouping.direct(new NullStruct()); } - private static ComponentCommon mkComponentcommon( - Map<GlobalStreamId, Grouping> inputs, - HashMap<String, StreamInfo> output_spec, Integer parallelism_hint) { + public static Grouping mkAllGrouping() { + return Grouping.all(new NullStruct()); + } + + private static ComponentCommon mkComponentcommon(Map<GlobalStreamId, Grouping> inputs, HashMap<String, StreamInfo> output_spec, Integer parallelism_hint) { ComponentCommon ret = new ComponentCommon(inputs, output_spec); if (parallelism_hint != null) { ret.set_parallelism_hint(parallelism_hint); @@ -162,8 +147,7 @@ public class Thrift { return ret; } - public static Bolt mkBolt(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, - HashMap<String, StreamInfo> output, Integer p) { + public static Bolt mkBolt(Map<GlobalStreamId, Grouping> inputs, IBolt bolt, HashMap<String, StreamInfo> output, Integer p) { ComponentCommon common = mkComponentcommon(inputs, output, p); byte[] boltSer = Utils.serialize(bolt); ComponentObject component = ComponentObject.serialized_java(boltSer); @@ -171,8 +155,7 @@ public class Thrift { } public static StormTopology._Fields[] STORM_TOPOLOGY_FIELDS = null; - public static StormTopology._Fields[] SPOUT_FIELDS = { - StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS }; + public static StormTopology._Fields[] SPOUT_FIELDS = { StormTopology._Fields.SPOUTS, StormTopology._Fields.STATE_SPOUTS }; static { Set<_Fields> keys = StormTopology.metaDataMap.keySet(); STORM_TOPOLOGY_FIELDS = new StormTopology._Fields[keys.size()]; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java index c56e307..4d2ea0f 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheMap.java @@ -24,9 +24,8 @@ import java.util.Map; import java.util.Map.Entry; /** - * Expires keys that have not been updated in the configured number of seconds. - * The algorithm used will take between expirationSecs and expirationSecs * (1 + - * 1 / (numBuckets-1)) to actually expire the message. + * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1 + * / (numBuckets-1)) to actually expire the message. * * get, put, remove, containsKey, and size take O(numBuckets) time to run. * @@ -42,8 +41,7 @@ public class TimeCacheMap<K, V> implements TimeOutMap<K, V> { private Thread _cleaner; private ExpiredCallback _callback; - public TimeCacheMap(int expirationSecs, int numBuckets, - ExpiredCallback<K, V> callback) { + public TimeCacheMap(int expirationSecs, int numBuckets, ExpiredCallback<K, V> callback) { if (numBuckets < 2) { throw new IllegalArgumentException("numBuckets must be >= 2"); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java index 8468310..00e5cf3 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeCacheQueue.java @@ -25,14 +25,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Expires keys that have not been updated in the configured number of seconds. - * The algorithm used will take between expirationSecs and expirationSecs * (1 + - * 1 / (numBuckets-1)) to actually expire the message. + * Expires keys that have not been updated in the configured number of seconds. The algorithm used will take between expirationSecs and expirationSecs * (1 + 1 + * / (numBuckets-1)) to actually expire the message. * * get, put, remove, containsKey, and size take O(numBuckets) time to run. * - * The advantage of this design is that the expiration thread only locks the - * object for O(1) time, meaning the object is essentially always available for + * The advantage of this design is that the expiration thread only locks the object for O(1) time, meaning the object is essentially always available for * poll/offer */ public class TimeCacheQueue<K> { @@ -44,8 +42,7 @@ public class TimeCacheQueue<K> { } public static class DefaultExpiredCallback<K> implements ExpiredCallback<K> { - protected static final Logger LOG = LoggerFactory - .getLogger(TimeCacheQueue.DefaultExpiredCallback.class); + protected static final Logger LOG = LoggerFactory.getLogger(DefaultExpiredCallback.class); protected String queueName; @@ -54,8 +51,7 @@ public class TimeCacheQueue<K> { } public void expire(K entry) { - LOG.info("TimeCacheQueue " + queueName + " entry:" + entry - + ", timeout"); + LOG.info("TimeCacheQueue " + queueName + " entry:" + entry + ", timeout"); } } @@ -65,8 +61,7 @@ public class TimeCacheQueue<K> { protected Thread _cleaner; protected ExpiredCallback _callback; - public TimeCacheQueue(int expirationSecs, int numBuckets, - ExpiredCallback<K> callback) { + public TimeCacheQueue(int expirationSecs, int numBuckets, ExpiredCallback<K> callback) { if (numBuckets < 2) { throw new IllegalArgumentException("numBuckets must be >= 2"); } @@ -130,8 +125,7 @@ public class TimeCacheQueue<K> { public K poll() { synchronized (_lock) { - Iterator<LinkedBlockingDeque<K>> itor = - _buckets.descendingIterator(); + Iterator<LinkedBlockingDeque<K>> itor = _buckets.descendingIterator(); while (itor.hasNext()) { LinkedBlockingDeque<K> bucket = itor.next(); K entry = bucket.poll(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java index a5c189f..fbae631 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeFormat.java @@ -40,29 +40,24 @@ public class TimeFormat { public static final long ONE_DAY_HOURS = 24; - public static final long ONE_MINUTE_MILLISECONDS = ONE_MINUTE_SECONDS - * ONE_SECOND_MILLISECONDS; + public static final long ONE_MINUTE_MILLISECONDS = ONE_MINUTE_SECONDS * ONE_SECOND_MILLISECONDS; - public static final long ONE_HOUR_MILLISECONDS = ONE_HOUR_MINUTES - * ONE_MINUTE_MILLISECONDS; + public static final long ONE_HOUR_MILLISECONDS = ONE_HOUR_MINUTES * ONE_MINUTE_MILLISECONDS; - public static final long ONE_DAY_MILLISECONDS = ONE_DAY_HOURS - * ONE_HOUR_MILLISECONDS; + public static final long ONE_DAY_MILLISECONDS = ONE_DAY_HOURS * ONE_HOUR_MILLISECONDS; public static Date convertDate(String dateStr, String format) { Date date = null; try { if (format != null) { - SimpleDateFormat simpleDateFormat = - new SimpleDateFormat(format); + SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format); date = simpleDateFormat.parse(dateStr); } else { date = new Date(dateStr); } } catch (Exception ex) { - log.error("Failed to convert " + dateStr + " to Date, format:" - + format); + log.error("Failed to convert " + dateStr + " to Date, format:" + format); return null; } return date; @@ -77,8 +72,7 @@ public class TimeFormat { ret = sdf.format(date); } catch (Exception e) { - log.error("Failed to convert " + date + " to String, format:" - + format); + log.error("Failed to convert " + date + " to String, format:" + format); return null; } return ret; @@ -207,12 +201,9 @@ public class TimeFormat { tomorrow.set(Calendar.MINUTE, 0); Date startTime = tomorrow.getTime(); - long hourdiff = - (startTime.getTime() - current.getTime()) - / ONE_HOUR_MILLISECONDS; + long hourdiff = (startTime.getTime() - current.getTime()) / ONE_HOUR_MILLISECONDS; - System.out.println("Current:" + current + ", tomorrow" + startTime - + ", diff hour" + hourdiff); + System.out.println("Current:" + current + ", tomorrow" + startTime + ", diff hour" + hourdiff); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java index 8c9bd3d..9068731 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/utils/TimeUtils.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p/> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,19 +18,23 @@ package com.alibaba.jstorm.utils; import backtype.storm.utils.Time; +import com.alibaba.jstorm.metric.AsmWindow; + +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; /** * Time utils - * + * * @author yannian - * */ public class TimeUtils { + public static final long NS_PER_MS = 1000000L; + /** * Take care of int overflow - * - * @return */ public static int current_time_secs() { return (int) (Time.currentTimeMillis() / 1000); @@ -38,8 +42,6 @@ public class TimeUtils { /** * Take care of int overflow - * - * @return */ public static int time_delta(int time_secs) { return current_time_secs() - time_secs; @@ -48,4 +50,91 @@ public class TimeUtils { public static long time_delta_ms(long time_ms) { return System.currentTimeMillis() - time_ms; } + + public static final long NS_PER_US = 1000l; + + public static final int ONE_SEC = 1; + public static final int SEC_PER_MIN = 60; + public static final int SEC_PER_DAY = 86400; + + public static boolean isTimeAligned() { + return current_time_secs() % SEC_PER_DAY % SEC_PER_MIN == 0; + } + + public static int secOffset() { + return current_time_secs() % SEC_PER_DAY % SEC_PER_MIN; + } + + public static int secOffset(long ts) { + return (int) (ts % SEC_PER_DAY % SEC_PER_MIN); + } + + public static int winSecOffset(long ts, int window) { + return (int) (ts / 1000 % SEC_PER_DAY % window); + } + + public static long alignTimeToWin(long ts, int win) { + if (win != AsmWindow.D1_WINDOW) { + long curTimeSec = ts / 1000; + return (curTimeSec - curTimeSec % win) * 1000; + } else { + Calendar cal = Calendar.getInstance(); + cal.setTimeInMillis(ts); + int year = cal.get(Calendar.YEAR); + int month = cal.get(Calendar.MONTH); + int day = cal.get(Calendar.DAY_OF_MONTH); + int hour = cal.get(Calendar.HOUR); + int min = cal.get(Calendar.MINUTE); + int sec = cal.get(Calendar.SECOND); + if (sec + min + hour > 0) { + cal.set(year, month, day + 1, 0, 0, 0); + } + return cal.getTimeInMillis(); + } + } + + public static long alignTimeToMin(long ts) { + return alignTimeToWin(ts, AsmWindow.M1_WINDOW); + } + + public static String toTimeStr(Date time) { + int hour = time.getHours(); + int min = time.getMinutes(); + StringBuilder sb = new StringBuilder(); + if (hour < 10) { + sb.append(0).append(hour); + } else { + sb.append(hour); + } + sb.append(":"); + if (min < 10) { + sb.append(0).append(min); + } else { + sb.append(min); + } + return sb.toString(); + } + + public static String format(int curTimeSec) { + return format(new Date(curTimeSec * 1000L), "yyyy-MM-dd HH:mm:ss"); + } + + public static String format(Date time, String fmt) { + SimpleDateFormat df = new SimpleDateFormat(fmt); + return df.format(time); + } + + + public static void main(String[] args) throws Exception { + System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.M1_WINDOW))); + System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.M10_WINDOW))); + System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.H2_WINDOW))); + System.out.println(new Date(alignTimeToWin(System.currentTimeMillis(), AsmWindow.D1_WINDOW))); + + Calendar cal = Calendar.getInstance(); + cal.set(2015, 6, 23, 0, 0, 0); + System.out.println(new Date(alignTimeToWin(cal.getTimeInMillis(), AsmWindow.D1_WINDOW))); + + System.out.println(format(TimeUtils.current_time_secs())); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java index 09c25a5..eab0212 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkEventTypes.java @@ -32,8 +32,7 @@ public class ZkEventTypes { map.put(Watcher.Event.EventType.NodeCreated, ":node-created"); map.put(Watcher.Event.EventType.NodeDeleted, ":node-deleted"); map.put(Watcher.Event.EventType.NodeDataChanged, ":node-data-changed"); - map.put(Watcher.Event.EventType.NodeChildrenChanged, - ":node-children-changed"); + map.put(Watcher.Event.EventType.NodeChildrenChanged, ":node-children-changed"); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java index b726781..a098730 100755 --- a/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java +++ b/jstorm-core/src/main/java/com/alibaba/jstorm/zk/ZkTool.java @@ -20,34 +20,44 @@ package com.alibaba.jstorm.zk; import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.alibaba.jstorm.cluster.Cluster; +import org.apache.log4j.Logger; import backtype.storm.Config; import backtype.storm.utils.Utils; -import com.alibaba.jstorm.cluster.Cluster; import com.alibaba.jstorm.cluster.ClusterState; import com.alibaba.jstorm.cluster.DistributedClusterState; import com.google.common.collect.Maps; public class ZkTool { - private static Logger LOG = LoggerFactory.getLogger(ZkTool.class); + private static Logger LOG = Logger.getLogger(ZkTool.class); public static final String READ_CMD = "read"; public static final String RM_CMD = "rm"; + public static final String LIST_CMD = "list"; + + public static final String CLEAN_CMD = "clean"; + public static void usage() { LOG.info("Read ZK node's data, please do as following:"); LOG.info(ZkTool.class.getName() + " read zkpath"); LOG.info("\nDelete topology backup assignment, please do as following:"); LOG.info(ZkTool.class.getName() + " rm topologyname"); + + LOG.info("\nlist subdirectory of zkPath , please do as following:"); + LOG.info(ZkTool.class.getName() + " list zkpath"); + + LOG.info("\nDelete all nodes about a topologyId of zk , please do as following:"); + LOG.info(ZkTool.class.getName() + " clean topologyId"); + } public static String getData(DistributedClusterState zkClusterState, - String path) throws Exception { + String path) throws Exception { byte[] data = zkClusterState.get_data(path, false); if (data == null || data.length == 0) { return null; @@ -58,6 +68,135 @@ public class ZkTool { return obj.toString(); } + + public static void list(String path) { + DistributedClusterState zkClusterState = null; + + try { + conf.put(Config.STORM_ZOOKEEPER_ROOT, "/"); + + zkClusterState = new DistributedClusterState(conf); + + List<String> children = zkClusterState.get_children(path, false); + if (children == null || children.isEmpty() ) { + LOG.info("No children of " + path); + } + else + { + StringBuilder sb = new StringBuilder(); + sb.append("Zk node children of " + path + "\n"); + for (String str : children){ + sb.append(" " + str + ","); + } + sb.append("\n"); + LOG.info(sb.toString()); + } + } catch (Exception e) { + if (zkClusterState == null) { + LOG.error("Failed to connect ZK ", e); + } else { + LOG.error("Failed to list children of " + path + "\n", e); + } + } finally { + if (zkClusterState != null) { + zkClusterState.close(); + } + } + } + /** + * warnning! use this method cann't delete zkCache right now because of + * new DistributedClusterState(conf) + */ + public static void cleanTopology( String topologyId){ + DistributedClusterState zkClusterState = null; + try { + zkClusterState = new DistributedClusterState(conf); + String rootDir = String.valueOf(conf.get(Config.STORM_ZOOKEEPER_ROOT)); + String assignmentPath = "/assignments/"+ topologyId; + String stormBase = "/topology/"+ topologyId; + String taskbeats = "/taskbeats/"+ topologyId; + String tasks = "/tasks/"+ topologyId; + String taskerrors = "/taskerrors/"+ topologyId; + String monitor = "/monitor/"+ topologyId; + if (zkClusterState.node_existed(assignmentPath, false)){ + try { + zkClusterState.delete_node(assignmentPath); + } catch (Exception e) { + LOG.error("Could not remove assignments for " + topologyId, e); + } + }else { + LOG.info(" node of " + rootDir + assignmentPath + " isn't existed "); + + } + + if (zkClusterState.node_existed(stormBase, false)){ + try { + zkClusterState.delete_node(stormBase); + } catch (Exception e) { + LOG.error("Failed to remove storm base for " + topologyId, e); + } + }else { + LOG.info(" node of " + rootDir + stormBase + " isn't existed "); + + } + + if (zkClusterState.node_existed(taskbeats, false)){ + try { + zkClusterState.delete_node(taskbeats); + } catch (Exception e) { + LOG.error("Failed to remove taskbeats for " + topologyId, e); + } + }else { + LOG.info(" node of " + rootDir + taskbeats + " isn't existed "); + + } + + if (zkClusterState.node_existed(tasks, false)){ + try { + zkClusterState.delete_node(tasks); + } catch (Exception e) { + LOG.error("Failed to remove tasks for " + topologyId, e); + } + }else { + LOG.info(" node of " + rootDir + tasks + " isn't existed "); + + } + + if (zkClusterState.node_existed(taskerrors, false)){ + try { + zkClusterState.delete_node(taskerrors); + } catch (Exception e) { + LOG.error("Failed to remove taskerrors for " + topologyId, e); + } + }else { + LOG.info(" node of " + rootDir + taskerrors + " isn't existed "); + + } + + if (zkClusterState.node_existed(monitor, false)){ + try { + zkClusterState.delete_node(monitor); + } catch (Exception e) { + LOG.error("Failed to remove monitor for " + topologyId, e); + } + }else { + LOG.info(" node of " + rootDir + monitor + " isn't existed "); + + } + } catch (Exception e) { + if (zkClusterState == null) { + LOG.error("Failed to connect ZK ", e); + } else { + LOG.error("Failed to clean topolodyId: " + topologyId + "\n", e); + } + } finally { + if (zkClusterState != null) { + zkClusterState.close(); + } + } + + } + public static void readData(String path) { DistributedClusterState zkClusterState = null; @@ -110,8 +249,7 @@ public class ZkTool { if (tid.equals(topologyName)) { LOG.info("Find backup " + topologyName); - String topologyPath = - Cluster.assignment_bak_path(topologyName); + String topologyPath = assignment_bak_path(topologyName); zkClusterState.delete_node(topologyPath); LOG.info("Successfully delete topology " + topologyName @@ -161,12 +299,21 @@ public class ZkTool { } else if (args[0].equalsIgnoreCase(RM_CMD)) { rmBakTopology(args[1]); + } else if (args[0].equalsIgnoreCase(LIST_CMD)) { + list(args[1]); + } else if (args[0].equalsIgnoreCase(CLEAN_CMD)) { + cleanTopology(args[1]); } } /*******************************************************************/ + public static String assignment_bak_path(String id) { + return Cluster.ASSIGNMENTS_BAK_SUBTREE + Cluster.ZK_SEPERATOR + + id; + } + @SuppressWarnings("rawtypes") public static ClusterState mk_distributed_cluster_state(Map _conf) throws Exception { @@ -177,7 +324,8 @@ public class ZkTool { throws Exception { Map<String, String> ret = Maps.newHashMap(); List<String> followers = - cluster_state.get_children(Cluster.NIMBUS_SLAVE_SUBTREE, false); + cluster_state.get_children(Cluster.NIMBUS_SLAVE_SUBTREE, + false); if (followers == null || followers.size() == 0) { return ret; }
