http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java b/jstorm-core/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java index c1c7c0a..840a45d 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java @@ -45,28 +45,26 @@ public class LoggingMetricsConsumer implements IMetricsConsumer { public static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsConsumer.class); @Override - public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { } + public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { + } static private String padding = " "; @Override public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { StringBuilder sb = new StringBuilder(); - String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t", - taskInfo.timestamp, - taskInfo.srcWorkerHost, taskInfo.srcWorkerPort, - taskInfo.srcTaskId, - taskInfo.srcComponentId); + String header = + String.format("%d\t%15s:%-4d\t%3d:%-11s\t", taskInfo.timestamp, taskInfo.srcWorkerHost, taskInfo.srcWorkerPort, taskInfo.srcTaskId, + taskInfo.srcComponentId); sb.append(header); for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); - sb.append(p.name) - .append(padding).delete(header.length()+23,sb.length()).append("\t") - .append(p.value); + sb.append(p.name).append(padding).delete(header.length() + 23, sb.length()).append("\t").append(p.value); LOG.info(sb.toString()); } } @Override - public void cleanup() { } + public void cleanup() { + } }
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java b/jstorm-core/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java index d8eb3bf..afbc7da 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java @@ -41,18 +41,18 @@ public class MetricsConsumerBolt implements IBolt { @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { try { - _metricsConsumer = (IMetricsConsumer)Class.forName(_consumerClassName).newInstance(); + _metricsConsumer = (IMetricsConsumer) Class.forName(_consumerClassName).newInstance(); } catch (Exception e) { - throw new RuntimeException("Could not instantiate a class listed in config under section " + - Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e); + throw new RuntimeException("Could not instantiate a class listed in config under section " + Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + + " with fully qualified name " + _consumerClassName, e); } - _metricsConsumer.prepare(stormConf, _registrationArgument, context, (IErrorReporter)collector); + _metricsConsumer.prepare(stormConf, _registrationArgument, context, (IErrorReporter) collector); _collector = collector; } - + @Override public void execute(Tuple input) { - _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1)); + _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo) input.getValue(0), (Collection) input.getValue(1)); _collector.ack(input); } @@ -60,5 +60,5 @@ public class MetricsConsumerBolt implements IBolt { public void cleanup() { _metricsConsumer.cleanup(); } - + } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/SystemBolt.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/SystemBolt.java b/jstorm-core/src/main/java/backtype/storm/metric/SystemBolt.java index 492bc2d..43551ad 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/SystemBolt.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/SystemBolt.java @@ -35,7 +35,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; - // There is one task inside one executor for each worker of the topology. // TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt. // This bolt was conceived to export worker stats via metrics api. @@ -45,12 +44,14 @@ public class SystemBolt implements IBolt { private static class MemoryUsageMetric implements IMetric { IFn _getUsage; + public MemoryUsageMetric(IFn getUsage) { _getUsage = getUsage; } + @Override public Object getValueAndReset() { - MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke(); + MemoryUsage memUsage = (MemoryUsage) _getUsage.invoke(); HashMap m = new HashMap(); m.put("maxBytes", memUsage.getMax()); m.put("committedBytes", memUsage.getCommitted()); @@ -68,16 +69,18 @@ public class SystemBolt implements IBolt { GarbageCollectorMXBean _gcBean; Long _collectionCount; Long _collectionTime; + public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) { _gcBean = gcBean; } + @Override public Object getValueAndReset() { Long collectionCountP = _gcBean.getCollectionCount(); Long collectionTimeP = _gcBean.getCollectionTime(); Map ret = null; - if(_collectionCount!=null && _collectionTime!=null) { + if (_collectionCount != null && _collectionTime != null) { ret = new HashMap(); ret.put("count", collectionCountP - _collectionCount); ret.put("timeMs", collectionTimeP - _collectionTime); @@ -91,7 +94,7 @@ public class SystemBolt implements IBolt { @Override public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) { - if(_prepareWasCalled && !"local".equals(stormConf.get(Config.STORM_CLUSTER_MODE))) { + if (_prepareWasCalled && !"local".equals(stormConf.get(Config.STORM_CLUSTER_MODE))) { throw new RuntimeException("A single worker should have 1 SystemBolt instance."); } _prepareWasCalled = true; @@ -103,14 +106,14 @@ public class SystemBolt implements IBolt { context.registerMetric("uptimeSecs", new IMetric() { @Override public Object getValueAndReset() { - return jvmRT.getUptime()/1000.0; + return jvmRT.getUptime() / 1000.0; } }, bucketSize); context.registerMetric("startTimeSecs", new IMetric() { @Override public Object getValueAndReset() { - return jvmRT.getStartTime()/1000.0; + return jvmRT.getStartTime() / 1000.0; } }, bucketSize); @@ -122,7 +125,8 @@ public class SystemBolt implements IBolt { if (doEvent) { doEvent = false; return 1; - } else return 0; + } else + return 0; } }, bucketSize); @@ -139,7 +143,7 @@ public class SystemBolt implements IBolt { } }), bucketSize); - for(GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) { + for (GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) { context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""), new GarbageCollectorMetric(b), bucketSize); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/CombinedMetric.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/CombinedMetric.java b/jstorm-core/src/main/java/backtype/storm/metric/api/CombinedMetric.java index 5764a25..a840851 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/api/CombinedMetric.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/api/CombinedMetric.java @@ -25,7 +25,7 @@ public class CombinedMetric implements IMetric { _combiner = combiner; _value = _combiner.identity(); } - + public void update(Object value) { _value = _combiner.combine(_value, value); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/CountMetric.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/CountMetric.java b/jstorm-core/src/main/java/backtype/storm/metric/api/CountMetric.java index dd048b8..82dc529 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/api/CountMetric.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/api/CountMetric.java @@ -24,7 +24,7 @@ public class CountMetric implements IMetric { public CountMetric() { } - + public void incr() { _value++; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/ICombiner.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/ICombiner.java b/jstorm-core/src/main/java/backtype/storm/metric/api/ICombiner.java index 04b3156..4dcb0dd 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/api/ICombiner.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/api/ICombiner.java @@ -19,5 +19,6 @@ package backtype.storm.metric.api; public interface ICombiner<T> { public T identity(); + public T combine(T a, T b); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java b/jstorm-core/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java index 14f1bf6..840bc6b 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java @@ -24,37 +24,47 @@ import java.util.Map; public interface IMetricsConsumer { public static class TaskInfo { - public TaskInfo() {} + public TaskInfo() { + } + public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId, int srcTaskId, long timestamp, int updateIntervalSecs) { this.srcWorkerHost = srcWorkerHost; this.srcWorkerPort = srcWorkerPort; - this.srcComponentId = srcComponentId; - this.srcTaskId = srcTaskId; + this.srcComponentId = srcComponentId; + this.srcTaskId = srcTaskId; this.timestamp = timestamp; - this.updateIntervalSecs = updateIntervalSecs; + this.updateIntervalSecs = updateIntervalSecs; } + public String srcWorkerHost; public int srcWorkerPort; - public String srcComponentId; - public int srcTaskId; + public String srcComponentId; + public int srcTaskId; public long timestamp; - public int updateIntervalSecs; + public int updateIntervalSecs; } + public static class DataPoint { - public DataPoint() {} + public DataPoint() { + } + public DataPoint(String name, Object value) { this.name = name; this.value = value; } + @Override public String toString() { return "[" + name + " = " + value + "]"; } - public String name; + + public String name; public Object value; } void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter); + void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints); + void cleanup(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/IReducer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/IReducer.java b/jstorm-core/src/main/java/backtype/storm/metric/api/IReducer.java index a58df3b..403fe89 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/api/IReducer.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/api/IReducer.java @@ -19,6 +19,8 @@ package backtype.storm.metric.api; public interface IReducer<T> { T init(); + T reduce(T accumulator, Object input); + Object extractResult(T accumulator); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/MeanReducer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/MeanReducer.java b/jstorm-core/src/main/java/backtype/storm/metric/api/MeanReducer.java index e25e26d..4138cab 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/api/MeanReducer.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/api/MeanReducer.java @@ -31,23 +31,22 @@ public class MeanReducer implements IReducer<MeanReducerState> { public MeanReducerState reduce(MeanReducerState acc, Object input) { acc.count++; - if(input instanceof Double) { - acc.sum += (Double)input; - } else if(input instanceof Long) { - acc.sum += ((Long)input).doubleValue(); - } else if(input instanceof Integer) { - acc.sum += ((Integer)input).doubleValue(); + if (input instanceof Double) { + acc.sum += (Double) input; + } else if (input instanceof Long) { + acc.sum += ((Long) input).doubleValue(); + } else if (input instanceof Integer) { + acc.sum += ((Integer) input).doubleValue(); } else { - throw new RuntimeException( - "MeanReducer::reduce called with unsupported input type `" + input.getClass() - + "`. Supported types are Double, Long, Integer."); + throw new RuntimeException("MeanReducer::reduce called with unsupported input type `" + input.getClass() + + "`. Supported types are Double, Long, Integer."); } return acc; } public Object extractResult(MeanReducerState acc) { - if(acc.count > 0) { - return new Double(acc.sum / (double)acc.count); + if (acc.count > 0) { + return new Double(acc.sum / (double) acc.count); } else { return null; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/MultiCountMetric.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/MultiCountMetric.java b/jstorm-core/src/main/java/backtype/storm/metric/api/MultiCountMetric.java index c420a16..eae7de3 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/api/MultiCountMetric.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/api/MultiCountMetric.java @@ -26,10 +26,10 @@ public class MultiCountMetric implements IMetric { public MultiCountMetric() { } - + public CountMetric scope(String key) { CountMetric val = _value.get(key); - if(val == null) { + if (val == null) { _value.put(key, val = new CountMetric()); } return val; @@ -37,7 +37,7 @@ public class MultiCountMetric implements IMetric { public Object getValueAndReset() { Map ret = new HashMap(); - for(Map.Entry<String, CountMetric> e : _value.entrySet()) { + for (Map.Entry<String, CountMetric> e : _value.entrySet()) { ret.put(e.getKey(), e.getValue().getValueAndReset()); } return ret; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java b/jstorm-core/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java index 530b168..09b26a7 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java @@ -28,10 +28,10 @@ public class MultiReducedMetric implements IMetric { public MultiReducedMetric(IReducer reducer) { _reducer = reducer; } - + public ReducedMetric scope(String key) { ReducedMetric val = _value.get(key); - if(val == null) { + if (val == null) { _value.put(key, val = new ReducedMetric(_reducer)); } return val; @@ -39,9 +39,9 @@ public class MultiReducedMetric implements IMetric { public Object getValueAndReset() { Map ret = new HashMap(); - for(Map.Entry<String, ReducedMetric> e : _value.entrySet()) { + for (Map.Entry<String, ReducedMetric> e : _value.entrySet()) { Object val = e.getValue().getValueAndReset(); - if(val != null) { + if (val != null) { ret.put(e.getKey(), val); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java b/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java index def74c2..68d6a5c 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java @@ -21,16 +21,13 @@ import backtype.storm.metric.api.CountMetric; public class CountShellMetric extends CountMetric implements IShellMetric { /*** - * @param - * params should be null or long - * if value is null, it will call incr() - * if value is long, it will call incrBy((long)params) + * @param params should be null or long if value is null, it will call incr() if value is long, it will call incrBy((long)params) * */ public void updateMetricFromRPC(Object value) { if (value == null) { incr(); } else if (value instanceof Long) { - incrBy((Long)value); + incrBy((Long) value); } else { throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long"); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java b/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java index d53baea..4b2d97a 100755 --- a/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java +++ b/jstorm-core/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java @@ -21,11 +21,9 @@ import backtype.storm.metric.api.IMetric; public interface IShellMetric extends IMetric { /*** - * @function - * This interface is used by ShellBolt and ShellSpout through RPC call to update Metric - * @param - * value used to update metric, its's meaning change according implementation - * Object can be any json support types: String, Long, Double, Boolean, Null, List, Map + * @function This interface is used by ShellBolt and ShellSpout through RPC call to update Metric + * @param value used to update metric, its's meaning change according implementation Object can be any json support types: String, Long, Double, Boolean, + * Null, List, Map * */ public void updateMetricFromRPC(Object value); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/BoltMsg.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/multilang/BoltMsg.java b/jstorm-core/src/main/java/backtype/storm/multilang/BoltMsg.java index 446bdc4..d1eee4e 100755 --- a/jstorm-core/src/main/java/backtype/storm/multilang/BoltMsg.java +++ b/jstorm-core/src/main/java/backtype/storm/multilang/BoltMsg.java @@ -20,15 +20,12 @@ package backtype.storm.multilang; import java.util.List; /** - * BoltMsg is an object that represents the data sent from a shell component to - * a bolt process that implements a multi-language protocol. It is the union of - * all data types that a bolt can receive from Storm. - * + * BoltMsg is an object that represents the data sent from a shell component to a bolt process that implements a multi-language protocol. It is the union of all + * data types that a bolt can receive from Storm. + * * <p> - * BoltMsgs are objects sent to the ISerializer interface, for serialization - * according to the wire protocol implemented by the serializer. The BoltMsg - * class allows for a decoupling between the serialized representation of the - * data and the data itself. + * BoltMsgs are objects sent to the ISerializer interface, for serialization according to the wire protocol implemented by the serializer. The BoltMsg class + * allows for a decoupling between the serialized representation of the data and the data itself. * </p> */ public class BoltMsg { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/ISerializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/multilang/ISerializer.java b/jstorm-core/src/main/java/backtype/storm/multilang/ISerializer.java index c9c7ad4..6729d89 100755 --- a/jstorm-core/src/main/java/backtype/storm/multilang/ISerializer.java +++ b/jstorm-core/src/main/java/backtype/storm/multilang/ISerializer.java @@ -27,55 +27,52 @@ import java.util.Map; import backtype.storm.task.TopologyContext; /** - * The ISerializer interface describes the methods that an object should - * implement to provide serialization and de-serialization capabilities to - * non-JVM language components. + * The ISerializer interface describes the methods that an object should implement to provide serialization and de-serialization capabilities to non-JVM + * language components. */ public interface ISerializer extends Serializable { /** * This method sets the input and output streams of the serializer - * + * * @param processIn output stream to non-JVM component * @param processOut input stream from non-JVM component */ void initialize(OutputStream processIn, InputStream processOut); /** - * This method transmits the Storm config to the non-JVM process and - * receives its pid. - * + * This method transmits the Storm config to the non-JVM process and receives its pid. + * * @param conf storm configuration * @param context topology context * @return process pid */ - Number connect(Map conf, TopologyContext context) throws IOException, - NoOutputException; + Number connect(Map conf, TopologyContext context) throws IOException, NoOutputException; /** * This method receives a shell message from the non-JVM process - * + * * @return shell message */ ShellMsg readShellMsg() throws IOException, NoOutputException; /** * This method sends a bolt message to a non-JVM bolt process - * + * * @param msg bolt message */ void writeBoltMsg(BoltMsg msg) throws IOException; /** * This method sends a spout message to a non-JVM spout process - * + * * @param msg spout message */ void writeSpoutMsg(SpoutMsg msg) throws IOException; /** * This method sends a list of task IDs to a non-JVM bolt process - * + * * @param taskIds list of task IDs */ void writeTaskIds(List<Integer> taskIds) throws IOException; http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/JsonSerializer.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/multilang/JsonSerializer.java b/jstorm-core/src/main/java/backtype/storm/multilang/JsonSerializer.java index 0e2e156..bce9a1a 100755 --- a/jstorm-core/src/main/java/backtype/storm/multilang/JsonSerializer.java +++ b/jstorm-core/src/main/java/backtype/storm/multilang/JsonSerializer.java @@ -52,8 +52,7 @@ public class JsonSerializer implements ISerializer { } } - public Number connect(Map conf, TopologyContext context) - throws IOException, NoOutputException { + public Number connect(Map conf, TopologyContext context) throws IOException, NoOutputException { JSONObject setupInfo = new JSONObject(); setupInfo.put("pidDir", context.getPIDDir()); setupInfo.put("conf", conf); @@ -140,22 +139,22 @@ public class JsonSerializer implements ISerializer { shellMsg.addAnchor((String) o); } } - - Object nameObj = msg.get("name"); + + Object nameObj = msg.get("name"); String metricName = null; if (nameObj != null && nameObj instanceof String) { metricName = (String) nameObj; } shellMsg.setMetricName(metricName); - + Object paramsObj = msg.get("params"); shellMsg.setMetricParams(paramsObj); if (command.equals("log")) { Object logLevelObj = msg.get("level"); if (logLevelObj != null && logLevelObj instanceof Long) { - long logLevel = (Long)logLevelObj; - shellMsg.setLogLevel((int)logLevel); + long logLevel = (Long) logLevelObj; + shellMsg.setLogLevel((int) logLevel); } } @@ -183,8 +182,7 @@ public class JsonSerializer implements ISerializer { if (line.length() == 0) { errorMessage.append(" No output read.\n"); } else { - errorMessage.append(" Currently read output: " - + line.toString() + "\n"); + errorMessage.append(" Currently read output: " + line.toString() + "\n"); } errorMessage.append("Serializer Exception:\n"); throw new NoOutputException(errorMessage.toString()); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/NoOutputException.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/multilang/NoOutputException.java b/jstorm-core/src/main/java/backtype/storm/multilang/NoOutputException.java index 1ce75d3..58b0a6e 100755 --- a/jstorm-core/src/main/java/backtype/storm/multilang/NoOutputException.java +++ b/jstorm-core/src/main/java/backtype/storm/multilang/NoOutputException.java @@ -18,8 +18,7 @@ package backtype.storm.multilang; /** - * A NoOutputException states that no data has been received from the connected - * non-JVM process. + * A NoOutputException states that no data has been received from the connected non-JVM process. */ public class NoOutputException extends Exception { public NoOutputException() { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/ShellMsg.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/multilang/ShellMsg.java b/jstorm-core/src/main/java/backtype/storm/multilang/ShellMsg.java index 9eafb1a..01434e0 100755 --- a/jstorm-core/src/main/java/backtype/storm/multilang/ShellMsg.java +++ b/jstorm-core/src/main/java/backtype/storm/multilang/ShellMsg.java @@ -21,15 +21,12 @@ import java.util.ArrayList; import java.util.List; /** - * ShellMsg is an object that represents the data sent to a shell component from - * a process that implements a multi-language protocol. It is the union of all + * ShellMsg is an object that represents the data sent to a shell component from a process that implements a multi-language protocol. It is the union of all * data types that a component can send to Storm. - * + * * <p> - * ShellMsgs are objects received from the ISerializer interface, after the - * serializer has deserialized the data from the underlying wire protocol. The - * ShellMsg class allows for a decoupling between the serialized representation - * of the data and the data itself. + * ShellMsgs are objects received from the ISerializer interface, after the serializer has deserialized the data from the underlying wire protocol. The ShellMsg + * class allows for a decoupling between the serialized representation of the data and the data itself. * </p> */ public class ShellMsg { @@ -42,22 +39,28 @@ public class ShellMsg { private List<Object> tuple; private boolean needTaskIds; - //metrics rpc + // metrics rpc private String metricName; private Object metricParams; - //logLevel + // logLevel public enum ShellLogLevel { TRACE, DEBUG, INFO, WARN, ERROR; public static ShellLogLevel fromInt(int i) { switch (i) { - case 0: return TRACE; - case 1: return DEBUG; - case 2: return INFO; - case 3: return WARN; - case 4: return ERROR; - default: return INFO; + case 0: + return TRACE; + case 1: + return DEBUG; + case 2: + return INFO; + case 3: + return WARN; + case 4: + return ERROR; + default: + return INFO; } } } @@ -168,18 +171,8 @@ public class ShellMsg { @Override public String toString() { - return "ShellMsg{" + - "command='" + command + '\'' + - ", id=" + id + - ", anchors=" + anchors + - ", stream='" + stream + '\'' + - ", task=" + task + - ", msg='" + msg + '\'' + - ", tuple=" + tuple + - ", needTaskIds=" + needTaskIds + - ", metricName='" + metricName + '\'' + - ", metricParams=" + metricParams + - ", logLevel=" + logLevel + - '}'; + return "ShellMsg{" + "command='" + command + '\'' + ", id=" + id + ", anchors=" + anchors + ", stream='" + stream + '\'' + ", task=" + task + ", msg='" + + msg + '\'' + ", tuple=" + tuple + ", needTaskIds=" + needTaskIds + ", metricName='" + metricName + '\'' + ", metricParams=" + metricParams + + ", logLevel=" + logLevel + '}'; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/multilang/SpoutMsg.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/multilang/SpoutMsg.java b/jstorm-core/src/main/java/backtype/storm/multilang/SpoutMsg.java index cb1b108..c08646c 100755 --- a/jstorm-core/src/main/java/backtype/storm/multilang/SpoutMsg.java +++ b/jstorm-core/src/main/java/backtype/storm/multilang/SpoutMsg.java @@ -18,15 +18,12 @@ package backtype.storm.multilang; /** - * SpoutMsg is an object that represents the data sent from a shell spout to a - * process that implements a multi-language spout. The SpoutMsg is used to send - * a "next", "ack" or "fail" message to a spout. - * + * SpoutMsg is an object that represents the data sent from a shell spout to a process that implements a multi-language spout. The SpoutMsg is used to send a + * "next", "ack" or "fail" message to a spout. + * * <p> - * Spout messages are objects sent to the ISerializer interface, for - * serialization according to the wire protocol implemented by the serializer. - * The SpoutMsg class allows for a decoupling between the serialized - * representation of the data and the data itself. + * Spout messages are objects sent to the ISerializer interface, for serialization according to the wire protocol implemented by the serializer. The SpoutMsg + * class allows for a decoupling between the serialized representation of the data and the data itself. * </p> */ public class SpoutMsg { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/nimbus/DefaultTopologyValidator.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/nimbus/DefaultTopologyValidator.java b/jstorm-core/src/main/java/backtype/storm/nimbus/DefaultTopologyValidator.java index a687215..14c5723 100755 --- a/jstorm-core/src/main/java/backtype/storm/nimbus/DefaultTopologyValidator.java +++ b/jstorm-core/src/main/java/backtype/storm/nimbus/DefaultTopologyValidator.java @@ -23,9 +23,10 @@ import java.util.Map; public class DefaultTopologyValidator implements ITopologyValidator { @Override - public void prepare(Map StormConf){ + public void prepare(Map StormConf) { } + @Override - public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { - } + public void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException { + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/nimbus/ITopologyValidator.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/nimbus/ITopologyValidator.java b/jstorm-core/src/main/java/backtype/storm/nimbus/ITopologyValidator.java index 99bd07b..36e2c18 100755 --- a/jstorm-core/src/main/java/backtype/storm/nimbus/ITopologyValidator.java +++ b/jstorm-core/src/main/java/backtype/storm/nimbus/ITopologyValidator.java @@ -23,6 +23,6 @@ import java.util.Map; public interface ITopologyValidator { void prepare(Map StormConf); - void validate(String topologyName, Map topologyConf, StormTopology topology) - throws InvalidTopologyException; + + void validate(String topologyName, Map topologyConf, StormTopology topology) throws InvalidTopologyException; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/planner/CompoundSpout.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/planner/CompoundSpout.java b/jstorm-core/src/main/java/backtype/storm/planner/CompoundSpout.java index 141b24b..6665d82 100755 --- a/jstorm-core/src/main/java/backtype/storm/planner/CompoundSpout.java +++ b/jstorm-core/src/main/java/backtype/storm/planner/CompoundSpout.java @@ -17,9 +17,8 @@ */ package backtype.storm.planner; - public class CompoundSpout - //implements ISpout +// implements ISpout { } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/planner/CompoundTask.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/planner/CompoundTask.java b/jstorm-core/src/main/java/backtype/storm/planner/CompoundTask.java index 40a7f37..2bd56ee 100755 --- a/jstorm-core/src/main/java/backtype/storm/planner/CompoundTask.java +++ b/jstorm-core/src/main/java/backtype/storm/planner/CompoundTask.java @@ -17,9 +17,8 @@ */ package backtype.storm.planner; - public class CompoundTask -// implements IBolt +// implements IBolt { } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/planner/TaskBundle.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/planner/TaskBundle.java b/jstorm-core/src/main/java/backtype/storm/planner/TaskBundle.java index 81c6209..4542aad 100755 --- a/jstorm-core/src/main/java/backtype/storm/planner/TaskBundle.java +++ b/jstorm-core/src/main/java/backtype/storm/planner/TaskBundle.java @@ -20,14 +20,13 @@ package backtype.storm.planner; import backtype.storm.task.IBolt; import java.io.Serializable; - public class TaskBundle implements Serializable { public IBolt task; public int componentId; - + public TaskBundle(IBolt task, int componentId) { this.task = task; this.componentId = componentId; } - + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/Cluster.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/Cluster.java b/jstorm-core/src/main/java/backtype/storm/scheduler/Cluster.java index e0c7cc7..14d9ede 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/Cluster.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/Cluster.java @@ -30,25 +30,25 @@ public class Cluster { /** * key: supervisor id, value: supervisor details */ - private Map<String, SupervisorDetails> supervisors; + private Map<String, SupervisorDetails> supervisors; /** * key: topologyId, value: topology's current assignments. */ private Map<String, SchedulerAssignmentImpl> assignments; /** * key topologyId, Value: scheduler's status. - */ + */ private Map<String, String> status; /** * a map from hostname to supervisor id. */ - private Map<String, List<String>> hostToId; - + private Map<String, List<String>> hostToId; + private Set<String> blackListedHosts = new HashSet<String>(); private INimbus inimbus; - public Cluster(INimbus nimbus, Map<String, SupervisorDetails> supervisors, Map<String, SchedulerAssignmentImpl> assignments){ + public Cluster(INimbus nimbus, Map<String, SupervisorDetails> supervisors, Map<String, SchedulerAssignmentImpl> assignments) { this.inimbus = nimbus; this.supervisors = new HashMap<String, SupervisorDetails>(supervisors.size()); this.supervisors.putAll(supervisors); @@ -65,35 +65,36 @@ public class Cluster { this.hostToId.get(host).add(nodeId); } } - + public void setBlacklistedHosts(Set<String> hosts) { blackListedHosts = hosts; } - + public Set<String> getBlacklistedHosts() { return blackListedHosts; } - + public void blacklistHost(String host) { // this is so it plays well with setting blackListedHosts to an immutable list - if(blackListedHosts==null) blackListedHosts = new HashSet<String>(); - if(!(blackListedHosts instanceof HashSet)) + if (blackListedHosts == null) + blackListedHosts = new HashSet<String>(); + if (!(blackListedHosts instanceof HashSet)) blackListedHosts = new HashSet<String>(blackListedHosts); blackListedHosts.add(host); } - + public boolean isBlackListed(String supervisorId) { - return blackListedHosts != null && blackListedHosts.contains(getHost(supervisorId)); + return blackListedHosts != null && blackListedHosts.contains(getHost(supervisorId)); } public boolean isBlacklistedHost(String host) { - return blackListedHosts != null && blackListedHosts.contains(host); + return blackListedHosts != null && blackListedHosts.contains(host); } - + public String getHost(String supervisorId) { return inimbus.getHostName(supervisors, supervisorId); } - + /** * Gets all the topologies which needs scheduling. * @@ -116,8 +117,8 @@ public class Cluster { * * A topology needs scheduling if one of the following conditions holds: * <ul> - * <li>Although the topology is assigned slots, but is squeezed. i.e. the topology is assigned less slots than desired.</li> - * <li>There are unassigned executors in this topology</li> + * <li>Although the topology is assigned slots, but is squeezed. i.e. the topology is assigned less slots than desired.</li> + * <li>There are unassigned executors in this topology</li> * </ul> */ public boolean needsScheduling(TopologyDetails topology) { @@ -139,7 +140,7 @@ public class Cluster { */ public Map<ExecutorDetails, String> getNeedsSchedulingExecutorToComponents(TopologyDetails topology) { Collection<ExecutorDetails> allExecutors = new HashSet(topology.getExecutors()); - + SchedulerAssignment assignment = this.assignments.get(topology.getId()); if (assignment != null) { Collection<ExecutorDetails> assignedExecutors = assignment.getExecutors(); @@ -148,7 +149,7 @@ public class Cluster { return topology.selectExecutorToComponent(allExecutors); } - + /** * Gets a component-id -> executors map which needs scheduling in this topology. * @@ -163,14 +164,13 @@ public class Cluster { if (!componentToExecutors.containsKey(component)) { componentToExecutors.put(component, new ArrayList<ExecutorDetails>()); } - + componentToExecutors.get(component).add(executor); } - + return componentToExecutors; } - /** * Get all the used ports of this supervisor. * @@ -207,9 +207,10 @@ public class Cluster { return ret; } - + public Set<Integer> getAssignablePorts(SupervisorDetails supervisor) { - if(isBlackListed(supervisor.id)) return new HashSet(); + if (isBlackListed(supervisor.id)) + return new HashSet(); return supervisor.allPorts; } @@ -229,7 +230,7 @@ public class Cluster { return slots; } - + public List<WorkerSlot> getAssignableSlots(SupervisorDetails supervisor) { Set<Integer> ports = this.getAssignablePorts(supervisor); List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size()); @@ -238,9 +239,9 @@ public class Cluster { slots.add(new WorkerSlot(supervisor.getId(), port)); } - return slots; + return slots; } - + /** * get the unassigned executors of the topology. */ @@ -250,13 +251,13 @@ public class Cluster { } Collection<ExecutorDetails> ret = new HashSet(topology.getExecutors()); - + SchedulerAssignment assignment = this.getAssignmentById(topology.getId()); if (assignment != null) { Set<ExecutorDetails> assignedExecutors = assignment.getExecutors(); ret.removeAll(assignedExecutors); } - + return ret; } @@ -287,16 +288,16 @@ public class Cluster { if (this.isSlotOccupied(slot)) { throw new RuntimeException("slot: [" + slot.getNodeId() + ", " + slot.getPort() + "] is already occupied."); } - - SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl)this.getAssignmentById(topologyId); + + SchedulerAssignmentImpl assignment = (SchedulerAssignmentImpl) this.getAssignmentById(topologyId); if (assignment == null) { assignment = new SchedulerAssignmentImpl(topologyId, new HashMap<ExecutorDetails, WorkerSlot>()); this.assignments.put(topologyId, assignment); } else { for (ExecutorDetails executor : executors) { - if (assignment.isExecutorAssigned(executor)) { - throw new RuntimeException("the executor is already assigned, you should unassign it before assign it to another slot."); - } + if (assignment.isExecutorAssigned(executor)) { + throw new RuntimeException("the executor is already assigned, you should unassign it before assign it to another slot."); + } } } @@ -316,7 +317,7 @@ public class Cluster { return slots; } - + public List<WorkerSlot> getAssignableSlots() { List<WorkerSlot> slots = new ArrayList<WorkerSlot>(); for (SupervisorDetails supervisor : this.supervisors.values()) { @@ -339,14 +340,14 @@ public class Cluster { } } } - + /** * free the slots. * * @param slots */ public void freeSlots(Collection<WorkerSlot> slots) { - if(slots!=null) { + if (slots != null) { for (WorkerSlot slot : slots) { this.freeSlot(slot); } @@ -365,10 +366,10 @@ public class Cluster { return true; } } - + return false; } - + /** * get the current assignment for the topology. */ @@ -390,10 +391,10 @@ public class Cluster { return null; } - + public Collection<WorkerSlot> getUsedSlots() { Set<WorkerSlot> ret = new HashSet(); - for(SchedulerAssignmentImpl s: assignments.values()) { + for (SchedulerAssignmentImpl s : assignments.values()) { ret.addAll(s.getExecutorToSlot().values()); } return ret; @@ -423,11 +424,11 @@ public class Cluster { */ public Map<String, SchedulerAssignment> getAssignments() { Map<String, SchedulerAssignment> ret = new HashMap<String, SchedulerAssignment>(this.assignments.size()); - + for (String topologyId : this.assignments.keySet()) { ret.put(topologyId, this.assignments.get(topologyId)); } - + return ret; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/ExecutorDetails.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/ExecutorDetails.java b/jstorm-core/src/main/java/backtype/storm/scheduler/ExecutorDetails.java index bcf4aca..5934d33 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/ExecutorDetails.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/ExecutorDetails.java @@ -21,7 +21,7 @@ public class ExecutorDetails { int startTask; int endTask; - public ExecutorDetails(int startTask, int endTask){ + public ExecutorDetails(int startTask, int endTask) { this.startTask = startTask; this.endTask = endTask; } @@ -38,17 +38,17 @@ public class ExecutorDetails { if (other == null || !(other instanceof ExecutorDetails)) { return false; } - - ExecutorDetails executor = (ExecutorDetails)other; + + ExecutorDetails executor = (ExecutorDetails) other; return (this.startTask == executor.startTask) && (this.endTask == executor.endTask); } - + public int hashCode() { return this.startTask + 13 * this.endTask; } - + @Override public String toString() { - return "[" + this.startTask + ", " + this.endTask + "]"; + return "[" + this.startTask + ", " + this.endTask + "]"; } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/INimbus.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/INimbus.java b/jstorm-core/src/main/java/backtype/storm/scheduler/INimbus.java index a0fb417..b13beb3 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/INimbus.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/INimbus.java @@ -23,17 +23,19 @@ import java.util.Set; public interface INimbus { void prepare(Map stormConf, String schedulerLocalDir); + /** - * Returns all slots that are available for the next round of scheduling. A slot is available for scheduling - * if it is free and can be assigned to, or if it is used and can be reassigned. + * Returns all slots that are available for the next round of scheduling. A slot is available for scheduling if it is free and can be assigned to, or if it + * is used and can be reassigned. */ - Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies, Set<String> topologiesMissingAssignments); + Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies, + Set<String> topologiesMissingAssignments); // this is called after the assignment is changed in ZK void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot>> newSlotsByTopologyId); - + // map from node id to supervisor details String getHostName(Map<String, SupervisorDetails> existingSupervisors, String nodeId); - - IScheduler getForcedScheduler(); + + IScheduler getForcedScheduler(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/IScheduler.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/IScheduler.java b/jstorm-core/src/main/java/backtype/storm/scheduler/IScheduler.java index 5395882..93096f8 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/IScheduler.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/IScheduler.java @@ -19,22 +19,18 @@ package backtype.storm.scheduler; import java.util.Map; - public interface IScheduler { - + void prepare(Map conf); - + /** - * Set assignments for the topologies which needs scheduling. The new assignments is available - * through <code>cluster.getAssignments()</code> - * - *@param topologies all the topologies in the cluster, some of them need schedule. Topologies object here - * only contain static information about topologies. Information like assignments, slots are all in - * the <code>cluster</code>object. - *@param cluster the cluster these topologies are running in. <code>cluster</code> contains everything user - * need to develop a new scheduling logic. e.g. supervisors information, available slots, current - * assignments for all the topologies etc. User can set the new assignment for topologies using - * <code>cluster.setAssignmentById</code> + * Set assignments for the topologies which needs scheduling. The new assignments is available through <code>cluster.getAssignments()</code> + * + * @param topologies all the topologies in the cluster, some of them need schedule. Topologies object here only contain static information about topologies. + * Information like assignments, slots are all in the <code>cluster</code>object. + * @param cluster the cluster these topologies are running in. <code>cluster</code> contains everything user need to develop a new scheduling logic. e.g. + * supervisors information, available slots, current assignments for all the topologies etc. User can set the new assignment for topologies using + * <code>cluster.setAssignmentById</code> */ void schedule(Topologies topologies, Cluster cluster); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/ISupervisor.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/ISupervisor.java b/jstorm-core/src/main/java/backtype/storm/scheduler/ISupervisor.java index 64e1595..d64f851 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/ISupervisor.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/ISupervisor.java @@ -20,26 +20,29 @@ package backtype.storm.scheduler; import java.util.Map; import java.util.Collection; - public interface ISupervisor { void prepare(Map stormConf, String schedulerLocalDir); + // for mesos, this is {hostname}-{topologyid} /** * The id used for writing metadata into ZK. */ String getSupervisorId(); + /** - * The id used in assignments. This combined with confirmAssigned decides what - * this supervisor is responsible for. The combination of this and getSupervisorId - * allows Nimbus to assign to a single machine and have multiple supervisors - * on that machine execute the assignment. This is important for achieving resource isolation. + * The id used in assignments. This combined with confirmAssigned decides what this supervisor is responsible for. The combination of this and + * getSupervisorId allows Nimbus to assign to a single machine and have multiple supervisors on that machine execute the assignment. This is important for + * achieving resource isolation. */ String getAssignmentId(); + Object getMetadata(); - + boolean confirmAssigned(int port); + // calls this before actually killing the worker locally... // sends a "task finished" update void killedWorker(int port); + void assigned(Collection<Integer> ports); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignment.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignment.java b/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignment.java index 0212e48..7451dcc 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignment.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignment.java @@ -23,6 +23,7 @@ import java.util.Set; public interface SchedulerAssignment { /** * Does this slot occupied by this assignment? + * * @param slot * @return */ @@ -35,24 +36,27 @@ public interface SchedulerAssignment { * @return */ public boolean isExecutorAssigned(ExecutorDetails executor); - + /** * get the topology-id this assignment is for. + * * @return */ public String getTopologyId(); /** * get the executor -> slot map. + * * @return */ public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot(); /** * Return the executors covered by this assignments + * * @return */ public Set<ExecutorDetails> getExecutors(); - + public Set<WorkerSlot> getSlots(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignmentImpl.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignmentImpl.java b/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignmentImpl.java index 08af4b7..7a6947f 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignmentImpl.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/SchedulerAssignmentImpl.java @@ -35,7 +35,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { * assignment detail, a mapping from executor to <code>WorkerSlot</code> */ Map<ExecutorDetails, WorkerSlot> executorToSlot; - + public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) { this.topologyId = topologyId; this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0); @@ -47,10 +47,11 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { @Override public Set<WorkerSlot> getSlots() { return new HashSet(executorToSlot.values()); - } - + } + /** * Assign the slot to executors. + * * @param slot * @param executors */ @@ -59,9 +60,10 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { this.executorToSlot.put(executor, slot); } } - + /** * Release the slot occupied by this assignment. + * * @param slot */ public void unassignBySlot(WorkerSlot slot) { @@ -72,7 +74,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { executors.add(executor); } } - + // remove for (ExecutorDetails executor : executors) { this.executorToSlot.remove(executor); @@ -81,6 +83,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { /** * Does this slot occupied by this assignment? + * * @param slot * @return */ @@ -91,7 +94,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { public boolean isExecutorAssigned(ExecutorDetails executor) { return this.executorToSlot.containsKey(executor); } - + public String getTopologyId() { return this.topologyId; } @@ -102,6 +105,7 @@ public class SchedulerAssignmentImpl implements SchedulerAssignment { /** * Return the executors covered by this assignments + * * @return */ public Set<ExecutorDetails> getExecutors() { http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/SupervisorDetails.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/SupervisorDetails.java b/jstorm-core/src/main/java/backtype/storm/scheduler/SupervisorDetails.java index 7497f26..2b8a400 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/SupervisorDetails.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/SupervisorDetails.java @@ -38,19 +38,19 @@ public class SupervisorDetails { */ Set<Integer> allPorts; - public SupervisorDetails(String id, Object meta){ + public SupervisorDetails(String id, Object meta) { this.id = id; this.meta = meta; allPorts = new HashSet(); } - - public SupervisorDetails(String id, Object meta, Collection<Number> allPorts){ + + public SupervisorDetails(String id, Object meta, Collection<Number> allPorts) { this.id = id; this.meta = meta; setAllPorts(allPorts); } - public SupervisorDetails(String id, String host, Object schedulerMeta, Collection<Number> allPorts){ + public SupervisorDetails(String id, String host, Object schedulerMeta, Collection<Number> allPorts) { this.id = id; this.host = host; this.schedulerMeta = schedulerMeta; @@ -60,8 +60,8 @@ public class SupervisorDetails { private void setAllPorts(Collection<Number> allPorts) { this.allPorts = new HashSet<Integer>(); - if(allPorts!=null) { - for(Number n: allPorts) { + if (allPorts != null) { + for (Number n : allPorts) { this.allPorts.add(n.intValue()); } } @@ -78,7 +78,7 @@ public class SupervisorDetails { public Object getMeta() { return meta; } - + public Set<Integer> getAllPorts() { return allPorts; } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/Topologies.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/Topologies.java b/jstorm-core/src/main/java/backtype/storm/scheduler/Topologies.java index 70af1b4..771fcf2 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/Topologies.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/Topologies.java @@ -24,33 +24,34 @@ import java.util.Map; public class Topologies { Map<String, TopologyDetails> topologies; Map<String, String> nameToId; - + public Topologies(Map<String, TopologyDetails> topologies) { - if(topologies==null) topologies = new HashMap(); + if (topologies == null) + topologies = new HashMap(); this.topologies = new HashMap<String, TopologyDetails>(topologies.size()); this.topologies.putAll(topologies); this.nameToId = new HashMap<String, String>(topologies.size()); - + for (String topologyId : topologies.keySet()) { TopologyDetails topology = topologies.get(topologyId); this.nameToId.put(topology.getName(), topologyId); } } - + public TopologyDetails getById(String topologyId) { return this.topologies.get(topologyId); } - + public TopologyDetails getByName(String topologyName) { String topologyId = this.nameToId.get(topologyName); - + if (topologyId == null) { return null; } else { return this.getById(topologyId); } } - + public Collection<TopologyDetails> getTopologies() { return this.topologies.values(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/TopologyDetails.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/TopologyDetails.java b/jstorm-core/src/main/java/backtype/storm/scheduler/TopologyDetails.java index 6daf4ed..84b3966 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/TopologyDetails.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/TopologyDetails.java @@ -24,21 +24,20 @@ import java.util.Map; import backtype.storm.Config; import backtype.storm.generated.StormTopology; - public class TopologyDetails { String topologyId; Map topologyConf; StormTopology topology; Map<ExecutorDetails, String> executorToComponent; int numWorkers; - + public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers) { this.topologyId = topologyId; this.topologyConf = topologyConf; this.topology = topology; this.numWorkers = numWorkers; } - + public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology, int numWorkers, Map<ExecutorDetails, String> executorToComponents) { this(topologyId, topologyConf, topology, numWorkers); this.executorToComponent = new HashMap<ExecutorDetails, String>(0); @@ -46,23 +45,23 @@ public class TopologyDetails { this.executorToComponent.putAll(executorToComponents); } } - + public String getId() { return topologyId; } - + public String getName() { - return (String)this.topologyConf.get(Config.TOPOLOGY_NAME); + return (String) this.topologyConf.get(Config.TOPOLOGY_NAME); } - + public Map getConf() { return topologyConf; } - + public int getNumWorkers() { return numWorkers; } - + public StormTopology getTopology() { return topology; } @@ -79,10 +78,10 @@ public class TopologyDetails { ret.put(executor, compId); } } - + return ret; } - + public Collection<ExecutorDetails> getExecutors() { return this.executorToComponent.keySet(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/WorkerSlot.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/WorkerSlot.java b/jstorm-core/src/main/java/backtype/storm/scheduler/WorkerSlot.java index 8331ad8..baeb233 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/WorkerSlot.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/WorkerSlot.java @@ -20,36 +20,36 @@ package backtype.storm.scheduler; import java.io.Serializable; public class WorkerSlot implements Comparable<WorkerSlot>, Serializable { - + private static final long serialVersionUID = -4451854497340313268L; String nodeId; int port; - + public WorkerSlot(String nodeId, Number port) { this.nodeId = nodeId; this.port = port.intValue(); } - + public WorkerSlot() { - + } - + public String getNodeId() { return nodeId; } - + public int getPort() { return port; } - + public void setNodeId(String nodeId) { this.nodeId = nodeId; } - + public void setPort(int port) { this.port = port; } - + @Override public int hashCode() { final int prime = 31; @@ -58,7 +58,7 @@ public class WorkerSlot implements Comparable<WorkerSlot>, Serializable { result = prime * result + port; return result; } - + @Override public boolean equals(Object obj) { if (this == obj) @@ -77,12 +77,12 @@ public class WorkerSlot implements Comparable<WorkerSlot>, Serializable { return false; return true; } - + @Override public String toString() { return this.nodeId + ":" + this.port; } - + @Override public int compareTo(WorkerSlot o) { String otherNode = o.getNodeId(); http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/DefaultPool.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/DefaultPool.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/DefaultPool.java index 3053b5b..8064a0d 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/DefaultPool.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/DefaultPool.java @@ -36,184 +36,183 @@ import backtype.storm.scheduler.WorkerSlot; * A pool of machines that anyone can use, but topologies are not isolated */ public class DefaultPool extends NodePool { - private static final Logger LOG = LoggerFactory.getLogger(DefaultPool.class); - private Set<Node> _nodes = new HashSet<Node>(); - private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>(); - - @Override - public void addTopology(TopologyDetails td) { - String topId = td.getId(); - LOG.debug("Adding in Topology {}", topId); - _tds.put(topId, td); - SchedulerAssignment assignment = _cluster.getAssignmentById(topId); - if (assignment != null) { - for (WorkerSlot ws: assignment.getSlots()) { - Node n = _nodeIdToNode.get(ws.getNodeId()); - _nodes.add(n); - } - } - } - - @Override - public boolean canAdd(TopologyDetails td) { - return true; - } - - @Override - public Collection<Node> takeNodes(int nodesNeeded) { - HashSet<Node> ret = new HashSet<Node>(); - LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes); - Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); - for (Node n: sortedNodes) { - if (nodesNeeded <= ret.size()) { - break; - } - if (n.isAlive()) { - n.freeAllSlots(_cluster); - _nodes.remove(n); - ret.add(n); - } - } - return ret; - } - - @Override - public int nodesAvailable() { - int total = 0; - for (Node n: _nodes) { - if (n.isAlive()) total++; - } - return total; - } - - @Override - public int slotsAvailable() { - return Node.countTotalSlotsAlive(_nodes); - } - - @Override - public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { - int nodesFound = 0; - int slotsFound = 0; - LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes); - Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); - for (Node n: sortedNodes) { - if (slotsNeeded <= 0) { - break; - } - if (n.isAlive()) { - nodesFound++; - int totalSlotsFree = n.totalSlots(); - slotsFound += totalSlotsFree; - slotsNeeded -= totalSlotsFree; - } + private static final Logger LOG = LoggerFactory.getLogger(DefaultPool.class); + private Set<Node> _nodes = new HashSet<Node>(); + private HashMap<String, TopologyDetails> _tds = new HashMap<String, TopologyDetails>(); + + @Override + public void addTopology(TopologyDetails td) { + String topId = td.getId(); + LOG.debug("Adding in Topology {}", topId); + _tds.put(topId, td); + SchedulerAssignment assignment = _cluster.getAssignmentById(topId); + if (assignment != null) { + for (WorkerSlot ws : assignment.getSlots()) { + Node n = _nodeIdToNode.get(ws.getNodeId()); + _nodes.add(n); + } + } } - return new NodeAndSlotCounts(nodesFound, slotsFound); - } - - @Override - public Collection<Node> takeNodesBySlots(int slotsNeeded) { - HashSet<Node> ret = new HashSet<Node>(); - LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes); - Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); - for (Node n: sortedNodes) { - if (slotsNeeded <= 0) { - break; - } - if (n.isAlive()) { - n.freeAllSlots(_cluster); - _nodes.remove(n); - ret.add(n); - slotsNeeded -= n.totalSlotsFree(); - } + + @Override + public boolean canAdd(TopologyDetails td) { + return true; } - return ret; - } - - @Override - public void scheduleAsNeeded(NodePool... lesserPools) { - for (TopologyDetails td : _tds.values()) { - String topId = td.getId(); - if (_cluster.needsScheduling(td)) { - LOG.debug("Scheduling topology {}",topId); - int totalTasks = td.getExecutors().size(); - int origRequest = td.getNumWorkers(); - int slotsRequested = Math.min(totalTasks, origRequest); - int slotsUsed = Node.countSlotsUsed(topId, _nodes); - int slotsFree = Node.countFreeSlotsAlive(_nodes); - //Check to see if we have enough slots before trying to get them - int slotsAvailable = 0; - if (slotsRequested > slotsFree) { - slotsAvailable = NodePool.slotsAvailable(lesserPools); - } - int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable); - int executorsNotRunning = _cluster.getUnassignedExecutors(td).size(); - LOG.debug("Slots... requested {} used {} free {} available {} to be used {}, executors not running {}", - new Object[] {slotsRequested, slotsUsed, slotsFree, slotsAvailable, slotsToUse, executorsNotRunning}); - if (slotsToUse <= 0) { - if (executorsNotRunning > 0) { - _cluster.setStatus(topId,"Not fully scheduled (No free slots in default pool) "+executorsNotRunning+" executors not scheduled"); - } else { - if (slotsUsed < slotsRequested) { - _cluster.setStatus(topId,"Running with fewer slots than requested ("+slotsUsed+"/"+origRequest+")"); - } else { //slotsUsed < origRequest - _cluster.setStatus(topId,"Fully Scheduled (requested "+origRequest+" slots, but could only use "+slotsUsed+")"); + + @Override + public Collection<Node> takeNodes(int nodesNeeded) { + HashSet<Node> ret = new HashSet<Node>(); + LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes); + Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); + for (Node n : sortedNodes) { + if (nodesNeeded <= ret.size()) { + break; + } + if (n.isAlive()) { + n.freeAllSlots(_cluster); + _nodes.remove(n); + ret.add(n); } - } - continue; } + return ret; + } - int slotsNeeded = slotsToUse - slotsFree; - if (slotsNeeded > 0) { - _nodes.addAll(NodePool.takeNodesBySlot(slotsNeeded, lesserPools)); + @Override + public int nodesAvailable() { + int total = 0; + for (Node n : _nodes) { + if (n.isAlive()) + total++; } + return total; + } + + @Override + public int slotsAvailable() { + return Node.countTotalSlotsAlive(_nodes); + } - if (executorsNotRunning <= 0) { - //There are free slots that we can take advantage of now. - for (Node n: _nodes) { - n.freeTopology(topId, _cluster); - } - slotsFree = Node.countFreeSlotsAlive(_nodes); - slotsToUse = Math.min(slotsRequested, slotsFree); + @Override + public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { + int nodesFound = 0; + int slotsFound = 0; + LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes); + Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); + for (Node n : sortedNodes) { + if (slotsNeeded <= 0) { + break; + } + if (n.isAlive()) { + nodesFound++; + int totalSlotsFree = n.totalSlots(); + slotsFound += totalSlotsFree; + slotsNeeded -= totalSlotsFree; + } } - - RoundRobinSlotScheduler slotSched = - new RoundRobinSlotScheduler(td, slotsToUse, _cluster); - - LinkedList<Node> nodes = new LinkedList<Node>(_nodes); - while (true) { - Node n = null; - do { - if (nodes.isEmpty()) { - throw new IllegalStateException("This should not happen, we" + - " messed up and did not get enough slots"); + return new NodeAndSlotCounts(nodesFound, slotsFound); + } + + @Override + public Collection<Node> takeNodesBySlots(int slotsNeeded) { + HashSet<Node> ret = new HashSet<Node>(); + LinkedList<Node> sortedNodes = new LinkedList<Node>(_nodes); + Collections.sort(sortedNodes, Node.FREE_NODE_COMPARATOR_DEC); + for (Node n : sortedNodes) { + if (slotsNeeded <= 0) { + break; } - n = nodes.peekFirst(); - if (n.totalSlotsFree() == 0) { - nodes.remove(); - n = null; + if (n.isAlive()) { + n.freeAllSlots(_cluster); + _nodes.remove(n); + ret.add(n); + slotsNeeded -= n.totalSlotsFree(); } - } while (n == null); - if (!slotSched.assignSlotTo(n)) { - break; - } } - int afterSchedSlotsUsed = Node.countSlotsUsed(topId, _nodes); - if (afterSchedSlotsUsed < slotsRequested) { - _cluster.setStatus(topId,"Running with fewer slots than requested ("+afterSchedSlotsUsed+"/"+origRequest+")"); - } else if (afterSchedSlotsUsed < origRequest) { - _cluster.setStatus(topId,"Fully Scheduled (requested "+origRequest+" slots, but could only use "+afterSchedSlotsUsed+")"); - } else { - _cluster.setStatus(topId,"Fully Scheduled"); + return ret; + } + + @Override + public void scheduleAsNeeded(NodePool... lesserPools) { + for (TopologyDetails td : _tds.values()) { + String topId = td.getId(); + if (_cluster.needsScheduling(td)) { + LOG.debug("Scheduling topology {}", topId); + int totalTasks = td.getExecutors().size(); + int origRequest = td.getNumWorkers(); + int slotsRequested = Math.min(totalTasks, origRequest); + int slotsUsed = Node.countSlotsUsed(topId, _nodes); + int slotsFree = Node.countFreeSlotsAlive(_nodes); + // Check to see if we have enough slots before trying to get them + int slotsAvailable = 0; + if (slotsRequested > slotsFree) { + slotsAvailable = NodePool.slotsAvailable(lesserPools); + } + int slotsToUse = Math.min(slotsRequested - slotsUsed, slotsFree + slotsAvailable); + int executorsNotRunning = _cluster.getUnassignedExecutors(td).size(); + LOG.debug("Slots... requested {} used {} free {} available {} to be used {}, executors not running {}", new Object[] { slotsRequested, + slotsUsed, slotsFree, slotsAvailable, slotsToUse, executorsNotRunning }); + if (slotsToUse <= 0) { + if (executorsNotRunning > 0) { + _cluster.setStatus(topId, "Not fully scheduled (No free slots in default pool) " + executorsNotRunning + " executors not scheduled"); + } else { + if (slotsUsed < slotsRequested) { + _cluster.setStatus(topId, "Running with fewer slots than requested (" + slotsUsed + "/" + origRequest + ")"); + } else { // slotsUsed < origRequest + _cluster.setStatus(topId, "Fully Scheduled (requested " + origRequest + " slots, but could only use " + slotsUsed + ")"); + } + } + continue; + } + + int slotsNeeded = slotsToUse - slotsFree; + if (slotsNeeded > 0) { + _nodes.addAll(NodePool.takeNodesBySlot(slotsNeeded, lesserPools)); + } + + if (executorsNotRunning <= 0) { + // There are free slots that we can take advantage of now. + for (Node n : _nodes) { + n.freeTopology(topId, _cluster); + } + slotsFree = Node.countFreeSlotsAlive(_nodes); + slotsToUse = Math.min(slotsRequested, slotsFree); + } + + RoundRobinSlotScheduler slotSched = new RoundRobinSlotScheduler(td, slotsToUse, _cluster); + + LinkedList<Node> nodes = new LinkedList<Node>(_nodes); + while (true) { + Node n = null; + do { + if (nodes.isEmpty()) { + throw new IllegalStateException("This should not happen, we" + " messed up and did not get enough slots"); + } + n = nodes.peekFirst(); + if (n.totalSlotsFree() == 0) { + nodes.remove(); + n = null; + } + } while (n == null); + if (!slotSched.assignSlotTo(n)) { + break; + } + } + int afterSchedSlotsUsed = Node.countSlotsUsed(topId, _nodes); + if (afterSchedSlotsUsed < slotsRequested) { + _cluster.setStatus(topId, "Running with fewer slots than requested (" + afterSchedSlotsUsed + "/" + origRequest + ")"); + } else if (afterSchedSlotsUsed < origRequest) { + _cluster.setStatus(topId, "Fully Scheduled (requested " + origRequest + " slots, but could only use " + afterSchedSlotsUsed + ")"); + } else { + _cluster.setStatus(topId, "Fully Scheduled"); + } + } else { + _cluster.setStatus(topId, "Fully Scheduled"); + } } - } else { - _cluster.setStatus(topId,"Fully Scheduled"); - } } - } - - @Override - public String toString() { - return "DefaultPool " + _nodes.size() + " nodes " + _tds.size() + " topologies"; - } + + @Override + public String toString() { + return "DefaultPool " + _nodes.size() + " nodes " + _tds.size() + " topologies"; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/FreePool.java ---------------------------------------------------------------------- diff --git a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/FreePool.java b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/FreePool.java index c625895..239e529 100755 --- a/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/FreePool.java +++ b/jstorm-core/src/main/java/backtype/storm/scheduler/multitenant/FreePool.java @@ -34,92 +34,92 @@ import backtype.storm.scheduler.TopologyDetails; * All of the machines that currently have nothing assigned to them */ public class FreePool extends NodePool { - private static final Logger LOG = LoggerFactory.getLogger(FreePool.class); - private Set<Node> _nodes = new HashSet<Node>(); - private int _totalSlots = 0; + private static final Logger LOG = LoggerFactory.getLogger(FreePool.class); + private Set<Node> _nodes = new HashSet<Node>(); + private int _totalSlots = 0; - @Override - public void init(Cluster cluster, Map<String, Node> nodeIdToNode) { - super.init(cluster, nodeIdToNode); - for (Node n: nodeIdToNode.values()) { - if(n.isTotallyFree() && n.isAlive()) { - _nodes.add(n); - _totalSlots += n.totalSlotsFree(); - } + @Override + public void init(Cluster cluster, Map<String, Node> nodeIdToNode) { + super.init(cluster, nodeIdToNode); + for (Node n : nodeIdToNode.values()) { + if (n.isTotallyFree() && n.isAlive()) { + _nodes.add(n); + _totalSlots += n.totalSlotsFree(); + } + } + LOG.debug("Found {} nodes with {} slots", _nodes.size(), _totalSlots); } - LOG.debug("Found {} nodes with {} slots", _nodes.size(), _totalSlots); - } - - @Override - public void addTopology(TopologyDetails td) { - throw new IllegalArgumentException("The free pool cannot run any topologies"); - } - @Override - public boolean canAdd(TopologyDetails td) { - // The free pool never has anything running - return false; - } - - @Override - public Collection<Node> takeNodes(int nodesNeeded) { - HashSet<Node> ret = new HashSet<Node>(); - Iterator<Node> it = _nodes.iterator(); - while (it.hasNext() && nodesNeeded > ret.size()) { - Node n = it.next(); - ret.add(n); - _totalSlots -= n.totalSlotsFree(); - it.remove(); + @Override + public void addTopology(TopologyDetails td) { + throw new IllegalArgumentException("The free pool cannot run any topologies"); } - return ret; - } - - @Override - public int nodesAvailable() { - return _nodes.size(); - } - @Override - public int slotsAvailable() { - return _totalSlots; - } + @Override + public boolean canAdd(TopologyDetails td) { + // The free pool never has anything running + return false; + } + + @Override + public Collection<Node> takeNodes(int nodesNeeded) { + HashSet<Node> ret = new HashSet<Node>(); + Iterator<Node> it = _nodes.iterator(); + while (it.hasNext() && nodesNeeded > ret.size()) { + Node n = it.next(); + ret.add(n); + _totalSlots -= n.totalSlotsFree(); + it.remove(); + } + return ret; + } - @Override - public Collection<Node> takeNodesBySlots(int slotsNeeded) { - HashSet<Node> ret = new HashSet<Node>(); - Iterator<Node> it = _nodes.iterator(); - while (it.hasNext() && slotsNeeded > 0) { - Node n = it.next(); - ret.add(n); - _totalSlots -= n.totalSlotsFree(); - slotsNeeded -= n.totalSlotsFree(); - it.remove(); + @Override + public int nodesAvailable() { + return _nodes.size(); } - return ret; - } - - @Override - public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { - int slotsFound = 0; - int nodesFound = 0; - Iterator<Node> it = _nodes.iterator(); - while (it.hasNext() && slotsNeeded > 0) { - Node n = it.next(); - nodesFound++; - int totalSlots = n.totalSlots(); - slotsFound += totalSlots; - slotsNeeded -= totalSlots; + + @Override + public int slotsAvailable() { + return _totalSlots; + } + + @Override + public Collection<Node> takeNodesBySlots(int slotsNeeded) { + HashSet<Node> ret = new HashSet<Node>(); + Iterator<Node> it = _nodes.iterator(); + while (it.hasNext() && slotsNeeded > 0) { + Node n = it.next(); + ret.add(n); + _totalSlots -= n.totalSlotsFree(); + slotsNeeded -= n.totalSlotsFree(); + it.remove(); + } + return ret; } - return new NodeAndSlotCounts(nodesFound, slotsFound); - } - @Override - public void scheduleAsNeeded(NodePool... lesserPools) { - //No topologies running so NOOP - } - - @Override - public String toString() { - return "FreePool of "+_nodes.size()+" nodes with "+_totalSlots+" slots"; - } + @Override + public NodeAndSlotCounts getNodeAndSlotCountIfSlotsWereTaken(int slotsNeeded) { + int slotsFound = 0; + int nodesFound = 0; + Iterator<Node> it = _nodes.iterator(); + while (it.hasNext() && slotsNeeded > 0) { + Node n = it.next(); + nodesFound++; + int totalSlots = n.totalSlots(); + slotsFound += totalSlots; + slotsNeeded -= totalSlots; + } + return new NodeAndSlotCounts(nodesFound, slotsFound); + } + + @Override + public void scheduleAsNeeded(NodePool... lesserPools) { + // No topologies running so NOOP + } + + @Override + public String toString() { + return "FreePool of " + _nodes.size() + " nodes with " + _totalSlots + " slots"; + } }
