add support for worker lifecycle hooks
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b03ce6b2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b03ce6b2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b03ce6b2 Branch: refs/heads/master Commit: b03ce6b28e0f16d11b769f75a069de0328637794 Parents: 037cd00 Author: Michael Schonfeld <mich...@schonfeld.org> Authored: Mon Nov 16 14:49:06 2015 -0500 Committer: Michael Schonfeld <mich...@schonfeld.org> Committed: Mon Nov 23 18:50:54 2015 -0500 ---------------------------------------------------------------------- .../src/clj/backtype/storm/daemon/common.clj | 30 +- .../src/clj/backtype/storm/daemon/worker.clj | 27 +- storm-core/src/clj/backtype/storm/thrift.clj | 3 + .../backtype/storm/generated/Assignment.java | 244 ++-- .../jvm/backtype/storm/generated/BoltStats.java | 340 ++--- .../storm/generated/ClusterSummary.java | 108 +- .../storm/generated/ClusterWorkerHeartbeat.java | 52 +- .../storm/generated/ComponentPageInfo.java | 220 ++-- .../backtype/storm/generated/Credentials.java | 44 +- .../backtype/storm/generated/ExecutorStats.java | 160 +-- .../storm/generated/LSApprovedWorkers.java | 44 +- .../generated/LSSupervisorAssignments.java | 48 +- .../backtype/storm/generated/LSTopoHistory.java | 64 +- .../storm/generated/LSTopoHistoryList.java | 36 +- .../storm/generated/LSWorkerHeartbeat.java | 36 +- .../storm/generated/LocalAssignment.java | 36 +- .../storm/generated/LocalStateData.java | 48 +- .../jvm/backtype/storm/generated/LogConfig.java | 48 +- .../jvm/backtype/storm/generated/Nimbus.java | 36 +- .../jvm/backtype/storm/generated/NodeInfo.java | 32 +- .../storm/generated/RebalanceOptions.java | 44 +- .../backtype/storm/generated/SpoutStats.java | 224 ++-- .../jvm/backtype/storm/generated/StormBase.java | 92 +- .../backtype/storm/generated/StormTopology.java | 251 +++- .../storm/generated/SupervisorInfo.java | 152 +-- .../storm/generated/SupervisorSummary.java | 44 +- .../storm/generated/TopologyHistoryInfo.java | 32 +- .../backtype/storm/generated/TopologyInfo.java | 164 +-- .../storm/generated/TopologyPageInfo.java | 96 +- .../backtype/storm/generated/TopologyStats.java | 220 ++-- .../backtype/storm/hooks/BaseWorkerHook.java | 34 + .../jvm/backtype/storm/hooks/IWorkerHook.java | 29 + .../storm/topology/TopologyBuilder.java | 43 +- .../storm/utils/ThriftTopologyUtils.java | 36 +- storm-core/src/py/storm/Nimbus.py | 14 +- storm-core/src/py/storm/ttypes.py | 1239 +++++++++--------- storm-core/src/storm.thrift | 1 + 37 files changed, 2330 insertions(+), 2041 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/clj/backtype/storm/daemon/common.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj index 35ae139..9b3aab3 100644 --- a/storm-core/src/clj/backtype/storm/daemon/common.clj +++ b/storm-core/src/clj/backtype/storm/daemon/common.clj @@ -16,7 +16,8 @@ (ns backtype.storm.daemon.common (:use [backtype.storm log config util]) (:import [backtype.storm.generated StormTopology - InvalidTopologyException GlobalStreamId]) + InvalidTopologyException GlobalStreamId] + [backtype.storm.utils ThriftTopologyUtils]) (:import [backtype.storm.utils Utils]) (:import [backtype.storm.task WorkerTopologyContext]) (:import [backtype.storm Constants]) @@ -113,22 +114,23 @@ (str "Duplicate component ids: " offending)))) (doseq [f thrift/STORM-TOPOLOGY-FIELDS :let [obj-map (.getFieldValue topology f)]] - (doseq [id (keys obj-map)] - (if (Utils/isSystemId id) - (throw (InvalidTopologyException. - (str id " is not a valid component id"))))) - (doseq [obj (vals obj-map) - id (-> obj .get_common .get_streams keys)] - (if (Utils/isSystemId id) - (throw (InvalidTopologyException. - (str id " is not a valid stream id")))))) - )) + (if-not (ThriftTopologyUtils/isWorkerHook f) + (do + (doseq [id (keys obj-map)] + (if (Utils/isSystemId id) + (throw (InvalidTopologyException. + (str id " is not a valid component id"))))) + (doseq [obj (vals obj-map) + id (-> obj .get_common .get_streams keys)] + (if (Utils/isSystemId id) + (throw (InvalidTopologyException. + (str id " is not a valid stream id")))))))))) (defn all-components [^StormTopology topology] (apply merge {} - (for [f thrift/STORM-TOPOLOGY-FIELDS] - (.getFieldValue topology f) - ))) + (for [f thrift/STORM-TOPOLOGY-FIELDS] + (if-not (ThriftTopologyUtils/isWorkerHook f) + (.getFieldValue topology f))))) (defn component-conf [component] (->> component http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/clj/backtype/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 579d76a..f522b02 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -22,7 +22,8 @@ (:require [backtype.storm [disruptor :as disruptor] [cluster :as cluster]]) (:require [clojure.set :as set]) (:require [backtype.storm.messaging.loader :as msg-loader]) - (:import [java.util.concurrent Executors]) + (:import [java.util.concurrent Executors] + [backtype.storm.hooks IWorkerHook BaseWorkerHook]) (:import [java.util ArrayList HashMap]) (:import [backtype.storm.utils Utils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue]) (:import [backtype.storm.grouping LoadMapping]) @@ -548,6 +549,25 @@ (reset! latest-log-config new-log-configs) (log-debug "New merged log config is " @latest-log-config)))) +(defn run-worker-start-hooks [worker] + (let [topology (:topology worker) + topo-conf (:conf worker) + worker-topology-context (worker-context worker) + task-ids (:task_ids worker) + hooks (.get_worker_hooks topology)] + (dofor [hook hooks] + (let [hook-bytes (Utils/toByteArray hook) + deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)] + (.start deser-hook topo-conf worker-topology-context task-ids))))) + +(defn run-worker-shutdown-hooks [worker] + (let [topology (:topology worker) + hooks (.get_worker_hooks topology)] + (dofor [hook hooks] + (let [hook-bytes (Utils/toByteArray hook) + deser-hook (Utils/javaDeserialize hook-bytes BaseWorkerHook)] + (.shutdown deser-hook))))) + ;; TODO: should worker even take the storm-id as input? this should be ;; deducable from cluster state (by searching through assignments) ;; what about if there's inconsistency in assignments? -> but nimbus @@ -604,6 +624,7 @@ _ (refresh-storm-active worker nil) + _ (run-worker-start-hooks worker) _ (reset! executors (dofor [e (:executors worker)] (executor/mk-executor worker e initial-credentials))) @@ -660,6 +681,8 @@ (close-resources worker) ;; TODO: here need to invoke the "shutdown" method of WorkerHook + (log-message "Trigger any worker shutdown hooks") + (run-worker-shutdown-hooks worker) (.remove-worker-heartbeat! (:storm-cluster-state worker) storm-id assignment-id port) (log-message "Disconnecting from storm cluster state context") @@ -738,4 +761,4 @@ (setup-default-uncaught-exception-handler) (validate-distributed-mode! conf) (let [worker (mk-worker conf nil storm-id assignment-id (Integer/parseInt port-str) worker-id)] - (add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown worker))))) + (add-shutdown-hook-with-force-kill-in-1-sec #(.shutdown worker))))) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/clj/backtype/storm/thrift.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/thrift.clj b/storm-core/src/clj/backtype/storm/thrift.clj index 8f4c659..545ce49 100644 --- a/storm-core/src/clj/backtype/storm/thrift.clj +++ b/storm-core/src/clj/backtype/storm/thrift.clj @@ -282,3 +282,6 @@ [StormTopology$_Fields/SPOUTS StormTopology$_Fields/STATE_SPOUTS]) +(def WORKER-HOOK-FIELD + [StormTopology$_Fields/WORKER_HOOKS]) + http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/jvm/backtype/storm/generated/Assignment.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/generated/Assignment.java b/storm-core/src/jvm/backtype/storm/generated/Assignment.java index 25874a4..cc9bb19 100644 --- a/storm-core/src/jvm/backtype/storm/generated/Assignment.java +++ b/storm-core/src/jvm/backtype/storm/generated/Assignment.java @@ -787,15 +787,15 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen case 2: // NODE_HOST if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map524 = iprot.readMapBegin(); - struct.node_host = new HashMap<String,String>(2*_map524.size); - String _key525; - String _val526; - for (int _i527 = 0; _i527 < _map524.size; ++_i527) + org.apache.thrift.protocol.TMap _map532 = iprot.readMapBegin(); + struct.node_host = new HashMap<String,String>(2*_map532.size); + String _key533; + String _val534; + for (int _i535 = 0; _i535 < _map532.size; ++_i535) { - _key525 = iprot.readString(); - _val526 = iprot.readString(); - struct.node_host.put(_key525, _val526); + _key533 = iprot.readString(); + _val534 = iprot.readString(); + struct.node_host.put(_key533, _val534); } iprot.readMapEnd(); } @@ -807,26 +807,26 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen case 3: // EXECUTOR_NODE_PORT if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map528 = iprot.readMapBegin(); - struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map528.size); - List<Long> _key529; - NodeInfo _val530; - for (int _i531 = 0; _i531 < _map528.size; ++_i531) + org.apache.thrift.protocol.TMap _map536 = iprot.readMapBegin(); + struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map536.size); + List<Long> _key537; + NodeInfo _val538; + for (int _i539 = 0; _i539 < _map536.size; ++_i539) { { - org.apache.thrift.protocol.TList _list532 = iprot.readListBegin(); - _key529 = new ArrayList<Long>(_list532.size); - long _elem533; - for (int _i534 = 0; _i534 < _list532.size; ++_i534) + org.apache.thrift.protocol.TList _list540 = iprot.readListBegin(); + _key537 = new ArrayList<Long>(_list540.size); + long _elem541; + for (int _i542 = 0; _i542 < _list540.size; ++_i542) { - _elem533 = iprot.readI64(); - _key529.add(_elem533); + _elem541 = iprot.readI64(); + _key537.add(_elem541); } iprot.readListEnd(); } - _val530 = new NodeInfo(); - _val530.read(iprot); - struct.executor_node_port.put(_key529, _val530); + _val538 = new NodeInfo(); + _val538.read(iprot); + struct.executor_node_port.put(_key537, _val538); } iprot.readMapEnd(); } @@ -838,25 +838,25 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen case 4: // EXECUTOR_START_TIME_SECS if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map535 = iprot.readMapBegin(); - struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map535.size); - List<Long> _key536; - long _val537; - for (int _i538 = 0; _i538 < _map535.size; ++_i538) + org.apache.thrift.protocol.TMap _map543 = iprot.readMapBegin(); + struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map543.size); + List<Long> _key544; + long _val545; + for (int _i546 = 0; _i546 < _map543.size; ++_i546) { { - org.apache.thrift.protocol.TList _list539 = iprot.readListBegin(); - _key536 = new ArrayList<Long>(_list539.size); - long _elem540; - for (int _i541 = 0; _i541 < _list539.size; ++_i541) + org.apache.thrift.protocol.TList _list547 = iprot.readListBegin(); + _key544 = new ArrayList<Long>(_list547.size); + long _elem548; + for (int _i549 = 0; _i549 < _list547.size; ++_i549) { - _elem540 = iprot.readI64(); - _key536.add(_elem540); + _elem548 = iprot.readI64(); + _key544.add(_elem548); } iprot.readListEnd(); } - _val537 = iprot.readI64(); - struct.executor_start_time_secs.put(_key536, _val537); + _val545 = iprot.readI64(); + struct.executor_start_time_secs.put(_key544, _val545); } iprot.readMapEnd(); } @@ -868,17 +868,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen case 5: // WORKER_RESOURCES if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map542 = iprot.readMapBegin(); - struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map542.size); - NodeInfo _key543; - WorkerResources _val544; - for (int _i545 = 0; _i545 < _map542.size; ++_i545) + org.apache.thrift.protocol.TMap _map550 = iprot.readMapBegin(); + struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map550.size); + NodeInfo _key551; + WorkerResources _val552; + for (int _i553 = 0; _i553 < _map550.size; ++_i553) { - _key543 = new NodeInfo(); - _key543.read(iprot); - _val544 = new WorkerResources(); - _val544.read(iprot); - struct.worker_resources.put(_key543, _val544); + _key551 = new NodeInfo(); + _key551.read(iprot); + _val552 = new WorkerResources(); + _val552.read(iprot); + struct.worker_resources.put(_key551, _val552); } iprot.readMapEnd(); } @@ -910,10 +910,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen oprot.writeFieldBegin(NODE_HOST_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.node_host.size())); - for (Map.Entry<String, String> _iter546 : struct.node_host.entrySet()) + for (Map.Entry<String, String> _iter554 : struct.node_host.entrySet()) { - oprot.writeString(_iter546.getKey()); - oprot.writeString(_iter546.getValue()); + oprot.writeString(_iter554.getKey()); + oprot.writeString(_iter554.getValue()); } oprot.writeMapEnd(); } @@ -925,17 +925,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen oprot.writeFieldBegin(EXECUTOR_NODE_PORT_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, struct.executor_node_port.size())); - for (Map.Entry<List<Long>, NodeInfo> _iter547 : struct.executor_node_port.entrySet()) + for (Map.Entry<List<Long>, NodeInfo> _iter555 : struct.executor_node_port.entrySet()) { { - oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter547.getKey().size())); - for (long _iter548 : _iter547.getKey()) + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter555.getKey().size())); + for (long _iter556 : _iter555.getKey()) { - oprot.writeI64(_iter548); + oprot.writeI64(_iter556); } oprot.writeListEnd(); } - _iter547.getValue().write(oprot); + _iter555.getValue().write(oprot); } oprot.writeMapEnd(); } @@ -947,17 +947,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen oprot.writeFieldBegin(EXECUTOR_START_TIME_SECS_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, struct.executor_start_time_secs.size())); - for (Map.Entry<List<Long>, Long> _iter549 : struct.executor_start_time_secs.entrySet()) + for (Map.Entry<List<Long>, Long> _iter557 : struct.executor_start_time_secs.entrySet()) { { - oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter549.getKey().size())); - for (long _iter550 : _iter549.getKey()) + oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter557.getKey().size())); + for (long _iter558 : _iter557.getKey()) { - oprot.writeI64(_iter550); + oprot.writeI64(_iter558); } oprot.writeListEnd(); } - oprot.writeI64(_iter549.getValue()); + oprot.writeI64(_iter557.getValue()); } oprot.writeMapEnd(); } @@ -969,10 +969,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen oprot.writeFieldBegin(WORKER_RESOURCES_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, struct.worker_resources.size())); - for (Map.Entry<NodeInfo, WorkerResources> _iter551 : struct.worker_resources.entrySet()) + for (Map.Entry<NodeInfo, WorkerResources> _iter559 : struct.worker_resources.entrySet()) { - _iter551.getKey().write(oprot); - _iter551.getValue().write(oprot); + _iter559.getKey().write(oprot); + _iter559.getValue().write(oprot); } oprot.writeMapEnd(); } @@ -1014,52 +1014,52 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen if (struct.is_set_node_host()) { { oprot.writeI32(struct.node_host.size()); - for (Map.Entry<String, String> _iter552 : struct.node_host.entrySet()) + for (Map.Entry<String, String> _iter560 : struct.node_host.entrySet()) { - oprot.writeString(_iter552.getKey()); - oprot.writeString(_iter552.getValue()); + oprot.writeString(_iter560.getKey()); + oprot.writeString(_iter560.getValue()); } } } if (struct.is_set_executor_node_port()) { { oprot.writeI32(struct.executor_node_port.size()); - for (Map.Entry<List<Long>, NodeInfo> _iter553 : struct.executor_node_port.entrySet()) + for (Map.Entry<List<Long>, NodeInfo> _iter561 : struct.executor_node_port.entrySet()) { { - oprot.writeI32(_iter553.getKey().size()); - for (long _iter554 : _iter553.getKey()) + oprot.writeI32(_iter561.getKey().size()); + for (long _iter562 : _iter561.getKey()) { - oprot.writeI64(_iter554); + oprot.writeI64(_iter562); } } - _iter553.getValue().write(oprot); + _iter561.getValue().write(oprot); } } } if (struct.is_set_executor_start_time_secs()) { { oprot.writeI32(struct.executor_start_time_secs.size()); - for (Map.Entry<List<Long>, Long> _iter555 : struct.executor_start_time_secs.entrySet()) + for (Map.Entry<List<Long>, Long> _iter563 : struct.executor_start_time_secs.entrySet()) { { - oprot.writeI32(_iter555.getKey().size()); - for (long _iter556 : _iter555.getKey()) + oprot.writeI32(_iter563.getKey().size()); + for (long _iter564 : _iter563.getKey()) { - oprot.writeI64(_iter556); + oprot.writeI64(_iter564); } } - oprot.writeI64(_iter555.getValue()); + oprot.writeI64(_iter563.getValue()); } } } if (struct.is_set_worker_resources()) { { oprot.writeI32(struct.worker_resources.size()); - for (Map.Entry<NodeInfo, WorkerResources> _iter557 : struct.worker_resources.entrySet()) + for (Map.Entry<NodeInfo, WorkerResources> _iter565 : struct.worker_resources.entrySet()) { - _iter557.getKey().write(oprot); - _iter557.getValue().write(oprot); + _iter565.getKey().write(oprot); + _iter565.getValue().write(oprot); } } } @@ -1073,81 +1073,81 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen BitSet incoming = iprot.readBitSet(4); if (incoming.get(0)) { { - org.apache.thrift.protocol.TMap _map558 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); - struct.node_host = new HashMap<String,String>(2*_map558.size); - String _key559; - String _val560; - for (int _i561 = 0; _i561 < _map558.size; ++_i561) + org.apache.thrift.protocol.TMap _map566 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32()); + struct.node_host = new HashMap<String,String>(2*_map566.size); + String _key567; + String _val568; + for (int _i569 = 0; _i569 < _map566.size; ++_i569) { - _key559 = iprot.readString(); - _val560 = iprot.readString(); - struct.node_host.put(_key559, _val560); + _key567 = iprot.readString(); + _val568 = iprot.readString(); + struct.node_host.put(_key567, _val568); } } struct.set_node_host_isSet(true); } if (incoming.get(1)) { { - org.apache.thrift.protocol.TMap _map562 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map562.size); - List<Long> _key563; - NodeInfo _val564; - for (int _i565 = 0; _i565 < _map562.size; ++_i565) + org.apache.thrift.protocol.TMap _map570 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map570.size); + List<Long> _key571; + NodeInfo _val572; + for (int _i573 = 0; _i573 < _map570.size; ++_i573) { { - org.apache.thrift.protocol.TList _list566 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32()); - _key563 = new ArrayList<Long>(_list566.size); - long _elem567; - for (int _i568 = 0; _i568 < _list566.size; ++_i568) + org.apache.thrift.protocol.TList _list574 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32()); + _key571 = new ArrayList<Long>(_list574.size); + long _elem575; + for (int _i576 = 0; _i576 < _list574.size; ++_i576) { - _elem567 = iprot.readI64(); - _key563.add(_elem567); + _elem575 = iprot.readI64(); + _key571.add(_elem575); } } - _val564 = new NodeInfo(); - _val564.read(iprot); - struct.executor_node_port.put(_key563, _val564); + _val572 = new NodeInfo(); + _val572.read(iprot); + struct.executor_node_port.put(_key571, _val572); } } struct.set_executor_node_port_isSet(true); } if (incoming.get(2)) { { - org.apache.thrift.protocol.TMap _map569 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32()); - struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map569.size); - List<Long> _key570; - long _val571; - for (int _i572 = 0; _i572 < _map569.size; ++_i572) + org.apache.thrift.protocol.TMap _map577 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32()); + struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map577.size); + List<Long> _key578; + long _val579; + for (int _i580 = 0; _i580 < _map577.size; ++_i580) { { - org.apache.thrift.protocol.TList _list573 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32()); - _key570 = new ArrayList<Long>(_list573.size); - long _elem574; - for (int _i575 = 0; _i575 < _list573.size; ++_i575) + org.apache.thrift.protocol.TList _list581 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32()); + _key578 = new ArrayList<Long>(_list581.size); + long _elem582; + for (int _i583 = 0; _i583 < _list581.size; ++_i583) { - _elem574 = iprot.readI64(); - _key570.add(_elem574); + _elem582 = iprot.readI64(); + _key578.add(_elem582); } } - _val571 = iprot.readI64(); - struct.executor_start_time_secs.put(_key570, _val571); + _val579 = iprot.readI64(); + struct.executor_start_time_secs.put(_key578, _val579); } } struct.set_executor_start_time_secs_isSet(true); } if (incoming.get(3)) { { - org.apache.thrift.protocol.TMap _map576 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map576.size); - NodeInfo _key577; - WorkerResources _val578; - for (int _i579 = 0; _i579 < _map576.size; ++_i579) + org.apache.thrift.protocol.TMap _map584 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.worker_resources = new HashMap<NodeInfo,WorkerResources>(2*_map584.size); + NodeInfo _key585; + WorkerResources _val586; + for (int _i587 = 0; _i587 < _map584.size; ++_i587) { - _key577 = new NodeInfo(); - _key577.read(iprot); - _val578 = new WorkerResources(); - _val578.read(iprot); - struct.worker_resources.put(_key577, _val578); + _key585 = new NodeInfo(); + _key585.read(iprot); + _val586 = new WorkerResources(); + _val586.read(iprot); + struct.worker_resources.put(_key585, _val586); } } struct.set_worker_resources_isSet(true); http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/jvm/backtype/storm/generated/BoltStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/generated/BoltStats.java b/storm-core/src/jvm/backtype/storm/generated/BoltStats.java index 6cef48a..cbadd32 100644 --- a/storm-core/src/jvm/backtype/storm/generated/BoltStats.java +++ b/storm-core/src/jvm/backtype/storm/generated/BoltStats.java @@ -881,41 +881,8 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._ case 1: // ACKED if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map108 = iprot.readMapBegin(); - struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map108.size); - String _key109; - Map<GlobalStreamId,Long> _val110; - for (int _i111 = 0; _i111 < _map108.size; ++_i111) - { - _key109 = iprot.readString(); - { - org.apache.thrift.protocol.TMap _map112 = iprot.readMapBegin(); - _val110 = new HashMap<GlobalStreamId,Long>(2*_map112.size); - GlobalStreamId _key113; - long _val114; - for (int _i115 = 0; _i115 < _map112.size; ++_i115) - { - _key113 = new GlobalStreamId(); - _key113.read(iprot); - _val114 = iprot.readI64(); - _val110.put(_key113, _val114); - } - iprot.readMapEnd(); - } - struct.acked.put(_key109, _val110); - } - iprot.readMapEnd(); - } - struct.set_acked_isSet(true); - } else { - org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); - } - break; - case 2: // FAILED - if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { - { org.apache.thrift.protocol.TMap _map116 = iprot.readMapBegin(); - struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map116.size); + struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map116.size); String _key117; Map<GlobalStreamId,Long> _val118; for (int _i119 = 0; _i119 < _map116.size; ++_i119) @@ -935,106 +902,139 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._ } iprot.readMapEnd(); } - struct.failed.put(_key117, _val118); + struct.acked.put(_key117, _val118); } iprot.readMapEnd(); } - struct.set_failed_isSet(true); + struct.set_acked_isSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 3: // PROCESS_MS_AVG + case 2: // FAILED if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { org.apache.thrift.protocol.TMap _map124 = iprot.readMapBegin(); - struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map124.size); + struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map124.size); String _key125; - Map<GlobalStreamId,Double> _val126; + Map<GlobalStreamId,Long> _val126; for (int _i127 = 0; _i127 < _map124.size; ++_i127) { _key125 = iprot.readString(); { org.apache.thrift.protocol.TMap _map128 = iprot.readMapBegin(); - _val126 = new HashMap<GlobalStreamId,Double>(2*_map128.size); + _val126 = new HashMap<GlobalStreamId,Long>(2*_map128.size); GlobalStreamId _key129; - double _val130; + long _val130; for (int _i131 = 0; _i131 < _map128.size; ++_i131) { _key129 = new GlobalStreamId(); _key129.read(iprot); - _val130 = iprot.readDouble(); + _val130 = iprot.readI64(); _val126.put(_key129, _val130); } iprot.readMapEnd(); } - struct.process_ms_avg.put(_key125, _val126); + struct.failed.put(_key125, _val126); } iprot.readMapEnd(); } - struct.set_process_ms_avg_isSet(true); + struct.set_failed_isSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 4: // EXECUTED + case 3: // PROCESS_MS_AVG if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { org.apache.thrift.protocol.TMap _map132 = iprot.readMapBegin(); - struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map132.size); + struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map132.size); String _key133; - Map<GlobalStreamId,Long> _val134; + Map<GlobalStreamId,Double> _val134; for (int _i135 = 0; _i135 < _map132.size; ++_i135) { _key133 = iprot.readString(); { org.apache.thrift.protocol.TMap _map136 = iprot.readMapBegin(); - _val134 = new HashMap<GlobalStreamId,Long>(2*_map136.size); + _val134 = new HashMap<GlobalStreamId,Double>(2*_map136.size); GlobalStreamId _key137; - long _val138; + double _val138; for (int _i139 = 0; _i139 < _map136.size; ++_i139) { _key137 = new GlobalStreamId(); _key137.read(iprot); - _val138 = iprot.readI64(); + _val138 = iprot.readDouble(); _val134.put(_key137, _val138); } iprot.readMapEnd(); } - struct.executed.put(_key133, _val134); + struct.process_ms_avg.put(_key133, _val134); } iprot.readMapEnd(); } - struct.set_executed_isSet(true); + struct.set_process_ms_avg_isSet(true); } else { org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; - case 5: // EXECUTE_MS_AVG + case 4: // EXECUTED if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { org.apache.thrift.protocol.TMap _map140 = iprot.readMapBegin(); - struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map140.size); + struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map140.size); String _key141; - Map<GlobalStreamId,Double> _val142; + Map<GlobalStreamId,Long> _val142; for (int _i143 = 0; _i143 < _map140.size; ++_i143) { _key141 = iprot.readString(); { org.apache.thrift.protocol.TMap _map144 = iprot.readMapBegin(); - _val142 = new HashMap<GlobalStreamId,Double>(2*_map144.size); + _val142 = new HashMap<GlobalStreamId,Long>(2*_map144.size); GlobalStreamId _key145; - double _val146; + long _val146; for (int _i147 = 0; _i147 < _map144.size; ++_i147) { _key145 = new GlobalStreamId(); _key145.read(iprot); - _val146 = iprot.readDouble(); + _val146 = iprot.readI64(); _val142.put(_key145, _val146); } iprot.readMapEnd(); } - struct.execute_ms_avg.put(_key141, _val142); + struct.executed.put(_key141, _val142); + } + iprot.readMapEnd(); + } + struct.set_executed_isSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 5: // EXECUTE_MS_AVG + if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { + { + org.apache.thrift.protocol.TMap _map148 = iprot.readMapBegin(); + struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map148.size); + String _key149; + Map<GlobalStreamId,Double> _val150; + for (int _i151 = 0; _i151 < _map148.size; ++_i151) + { + _key149 = iprot.readString(); + { + org.apache.thrift.protocol.TMap _map152 = iprot.readMapBegin(); + _val150 = new HashMap<GlobalStreamId,Double>(2*_map152.size); + GlobalStreamId _key153; + double _val154; + for (int _i155 = 0; _i155 < _map152.size; ++_i155) + { + _key153 = new GlobalStreamId(); + _key153.read(iprot); + _val154 = iprot.readDouble(); + _val150.put(_key153, _val154); + } + iprot.readMapEnd(); + } + struct.execute_ms_avg.put(_key149, _val150); } iprot.readMapEnd(); } @@ -1060,15 +1060,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._ oprot.writeFieldBegin(ACKED_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.acked.size())); - for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter148 : struct.acked.entrySet()) + for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter156 : struct.acked.entrySet()) { - oprot.writeString(_iter148.getKey()); + oprot.writeString(_iter156.getKey()); { - oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter148.getValue().size())); - for (Map.Entry<GlobalStreamId, Long> _iter149 : _iter148.getValue().entrySet()) + oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter156.getValue().size())); + for (Map.Entry<GlobalStreamId, Long> _iter157 : _iter156.getValue().entrySet()) { - _iter149.getKey().write(oprot); - oprot.writeI64(_iter149.getValue()); + _iter157.getKey().write(oprot); + oprot.writeI64(_iter157.getValue()); } oprot.writeMapEnd(); } @@ -1081,15 +1081,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._ oprot.writeFieldBegin(FAILED_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.failed.size())); - for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter150 : struct.failed.entrySet()) + for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter158 : struct.failed.entrySet()) { - oprot.writeString(_iter150.getKey()); + oprot.writeString(_iter158.getKey()); { - oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter150.getValue().size())); - for (Map.Entry<GlobalStreamId, Long> _iter151 : _iter150.getValue().entrySet()) + oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter158.getValue().size())); + for (Map.Entry<GlobalStreamId, Long> _iter159 : _iter158.getValue().entrySet()) { - _iter151.getKey().write(oprot); - oprot.writeI64(_iter151.getValue()); + _iter159.getKey().write(oprot); + oprot.writeI64(_iter159.getValue()); } oprot.writeMapEnd(); } @@ -1102,15 +1102,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._ oprot.writeFieldBegin(PROCESS_MS_AVG_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.process_ms_avg.size())); - for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter152 : struct.process_ms_avg.entrySet()) + for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter160 : struct.process_ms_avg.entrySet()) { - oprot.writeString(_iter152.getKey()); + oprot.writeString(_iter160.getKey()); { - oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter152.getValue().size())); - for (Map.Entry<GlobalStreamId, Double> _iter153 : _iter152.getValue().entrySet()) + oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter160.getValue().size())); + for (Map.Entry<GlobalStreamId, Double> _iter161 : _iter160.getValue().entrySet()) { - _iter153.getKey().write(oprot); - oprot.writeDouble(_iter153.getValue()); + _iter161.getKey().write(oprot); + oprot.writeDouble(_iter161.getValue()); } oprot.writeMapEnd(); } @@ -1123,15 +1123,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._ oprot.writeFieldBegin(EXECUTED_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.executed.size())); - for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter154 : struct.executed.entrySet()) + for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter162 : struct.executed.entrySet()) { - oprot.writeString(_iter154.getKey()); + oprot.writeString(_iter162.getKey()); { - oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter154.getValue().size())); - for (Map.Entry<GlobalStreamId, Long> _iter155 : _iter154.getValue().entrySet()) + oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter162.getValue().size())); + for (Map.Entry<GlobalStreamId, Long> _iter163 : _iter162.getValue().entrySet()) { - _iter155.getKey().write(oprot); - oprot.writeI64(_iter155.getValue()); + _iter163.getKey().write(oprot); + oprot.writeI64(_iter163.getValue()); } oprot.writeMapEnd(); } @@ -1144,15 +1144,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._ oprot.writeFieldBegin(EXECUTE_MS_AVG_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.execute_ms_avg.size())); - for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter156 : struct.execute_ms_avg.entrySet()) + for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter164 : struct.execute_ms_avg.entrySet()) { - oprot.writeString(_iter156.getKey()); + oprot.writeString(_iter164.getKey()); { - oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter156.getValue().size())); - for (Map.Entry<GlobalStreamId, Double> _iter157 : _iter156.getValue().entrySet()) + oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter164.getValue().size())); + for (Map.Entry<GlobalStreamId, Double> _iter165 : _iter164.getValue().entrySet()) { - _iter157.getKey().write(oprot); - oprot.writeDouble(_iter157.getValue()); + _iter165.getKey().write(oprot); + oprot.writeDouble(_iter165.getValue()); } oprot.writeMapEnd(); } @@ -1180,75 +1180,75 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._ TTupleProtocol oprot = (TTupleProtocol) prot; { oprot.writeI32(struct.acked.size()); - for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter158 : struct.acked.entrySet()) + for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter166 : struct.acked.entrySet()) { - oprot.writeString(_iter158.getKey()); + oprot.writeString(_iter166.getKey()); { - oprot.writeI32(_iter158.getValue().size()); - for (Map.Entry<GlobalStreamId, Long> _iter159 : _iter158.getValue().entrySet()) + oprot.writeI32(_iter166.getValue().size()); + for (Map.Entry<GlobalStreamId, Long> _iter167 : _iter166.getValue().entrySet()) { - _iter159.getKey().write(oprot); - oprot.writeI64(_iter159.getValue()); + _iter167.getKey().write(oprot); + oprot.writeI64(_iter167.getValue()); } } } } { oprot.writeI32(struct.failed.size()); - for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter160 : struct.failed.entrySet()) + for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter168 : struct.failed.entrySet()) { - oprot.writeString(_iter160.getKey()); + oprot.writeString(_iter168.getKey()); { - oprot.writeI32(_iter160.getValue().size()); - for (Map.Entry<GlobalStreamId, Long> _iter161 : _iter160.getValue().entrySet()) + oprot.writeI32(_iter168.getValue().size()); + for (Map.Entry<GlobalStreamId, Long> _iter169 : _iter168.getValue().entrySet()) { - _iter161.getKey().write(oprot); - oprot.writeI64(_iter161.getValue()); + _iter169.getKey().write(oprot); + oprot.writeI64(_iter169.getValue()); } } } } { oprot.writeI32(struct.process_ms_avg.size()); - for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter162 : struct.process_ms_avg.entrySet()) + for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter170 : struct.process_ms_avg.entrySet()) { - oprot.writeString(_iter162.getKey()); + oprot.writeString(_iter170.getKey()); { - oprot.writeI32(_iter162.getValue().size()); - for (Map.Entry<GlobalStreamId, Double> _iter163 : _iter162.getValue().entrySet()) + oprot.writeI32(_iter170.getValue().size()); + for (Map.Entry<GlobalStreamId, Double> _iter171 : _iter170.getValue().entrySet()) { - _iter163.getKey().write(oprot); - oprot.writeDouble(_iter163.getValue()); + _iter171.getKey().write(oprot); + oprot.writeDouble(_iter171.getValue()); } } } } { oprot.writeI32(struct.executed.size()); - for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter164 : struct.executed.entrySet()) + for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter172 : struct.executed.entrySet()) { - oprot.writeString(_iter164.getKey()); + oprot.writeString(_iter172.getKey()); { - oprot.writeI32(_iter164.getValue().size()); - for (Map.Entry<GlobalStreamId, Long> _iter165 : _iter164.getValue().entrySet()) + oprot.writeI32(_iter172.getValue().size()); + for (Map.Entry<GlobalStreamId, Long> _iter173 : _iter172.getValue().entrySet()) { - _iter165.getKey().write(oprot); - oprot.writeI64(_iter165.getValue()); + _iter173.getKey().write(oprot); + oprot.writeI64(_iter173.getValue()); } } } } { oprot.writeI32(struct.execute_ms_avg.size()); - for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter166 : struct.execute_ms_avg.entrySet()) + for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter174 : struct.execute_ms_avg.entrySet()) { - oprot.writeString(_iter166.getKey()); + oprot.writeString(_iter174.getKey()); { - oprot.writeI32(_iter166.getValue().size()); - for (Map.Entry<GlobalStreamId, Double> _iter167 : _iter166.getValue().entrySet()) + oprot.writeI32(_iter174.getValue().size()); + for (Map.Entry<GlobalStreamId, Double> _iter175 : _iter174.getValue().entrySet()) { - _iter167.getKey().write(oprot); - oprot.writeDouble(_iter167.getValue()); + _iter175.getKey().write(oprot); + oprot.writeDouble(_iter175.getValue()); } } } @@ -1259,33 +1259,8 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._ public void read(org.apache.thrift.protocol.TProtocol prot, BoltStats struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; { - org.apache.thrift.protocol.TMap _map168 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32()); - struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map168.size); - String _key169; - Map<GlobalStreamId,Long> _val170; - for (int _i171 = 0; _i171 < _map168.size; ++_i171) - { - _key169 = iprot.readString(); - { - org.apache.thrift.protocol.TMap _map172 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32()); - _val170 = new HashMap<GlobalStreamId,Long>(2*_map172.size); - GlobalStreamId _key173; - long _val174; - for (int _i175 = 0; _i175 < _map172.size; ++_i175) - { - _key173 = new GlobalStreamId(); - _key173.read(iprot); - _val174 = iprot.readI64(); - _val170.put(_key173, _val174); - } - } - struct.acked.put(_key169, _val170); - } - } - struct.set_acked_isSet(true); - { org.apache.thrift.protocol.TMap _map176 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32()); - struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map176.size); + struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map176.size); String _key177; Map<GlobalStreamId,Long> _val178; for (int _i179 = 0; _i179 < _map176.size; ++_i179) @@ -1304,82 +1279,107 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._ _val178.put(_key181, _val182); } } - struct.failed.put(_key177, _val178); + struct.acked.put(_key177, _val178); } } - struct.set_failed_isSet(true); + struct.set_acked_isSet(true); { org.apache.thrift.protocol.TMap _map184 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32()); - struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map184.size); + struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map184.size); String _key185; - Map<GlobalStreamId,Double> _val186; + Map<GlobalStreamId,Long> _val186; for (int _i187 = 0; _i187 < _map184.size; ++_i187) { _key185 = iprot.readString(); { - org.apache.thrift.protocol.TMap _map188 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32()); - _val186 = new HashMap<GlobalStreamId,Double>(2*_map188.size); + org.apache.thrift.protocol.TMap _map188 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32()); + _val186 = new HashMap<GlobalStreamId,Long>(2*_map188.size); GlobalStreamId _key189; - double _val190; + long _val190; for (int _i191 = 0; _i191 < _map188.size; ++_i191) { _key189 = new GlobalStreamId(); _key189.read(iprot); - _val190 = iprot.readDouble(); + _val190 = iprot.readI64(); _val186.put(_key189, _val190); } } - struct.process_ms_avg.put(_key185, _val186); + struct.failed.put(_key185, _val186); } } - struct.set_process_ms_avg_isSet(true); + struct.set_failed_isSet(true); { org.apache.thrift.protocol.TMap _map192 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32()); - struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map192.size); + struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map192.size); String _key193; - Map<GlobalStreamId,Long> _val194; + Map<GlobalStreamId,Double> _val194; for (int _i195 = 0; _i195 < _map192.size; ++_i195) { _key193 = iprot.readString(); { - org.apache.thrift.protocol.TMap _map196 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32()); - _val194 = new HashMap<GlobalStreamId,Long>(2*_map196.size); + org.apache.thrift.protocol.TMap _map196 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32()); + _val194 = new HashMap<GlobalStreamId,Double>(2*_map196.size); GlobalStreamId _key197; - long _val198; + double _val198; for (int _i199 = 0; _i199 < _map196.size; ++_i199) { _key197 = new GlobalStreamId(); _key197.read(iprot); - _val198 = iprot.readI64(); + _val198 = iprot.readDouble(); _val194.put(_key197, _val198); } } - struct.executed.put(_key193, _val194); + struct.process_ms_avg.put(_key193, _val194); } } - struct.set_executed_isSet(true); + struct.set_process_ms_avg_isSet(true); { org.apache.thrift.protocol.TMap _map200 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32()); - struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map200.size); + struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map200.size); String _key201; - Map<GlobalStreamId,Double> _val202; + Map<GlobalStreamId,Long> _val202; for (int _i203 = 0; _i203 < _map200.size; ++_i203) { _key201 = iprot.readString(); { - org.apache.thrift.protocol.TMap _map204 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32()); - _val202 = new HashMap<GlobalStreamId,Double>(2*_map204.size); + org.apache.thrift.protocol.TMap _map204 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32()); + _val202 = new HashMap<GlobalStreamId,Long>(2*_map204.size); GlobalStreamId _key205; - double _val206; + long _val206; for (int _i207 = 0; _i207 < _map204.size; ++_i207) { _key205 = new GlobalStreamId(); _key205.read(iprot); - _val206 = iprot.readDouble(); + _val206 = iprot.readI64(); _val202.put(_key205, _val206); } } - struct.execute_ms_avg.put(_key201, _val202); + struct.executed.put(_key201, _val202); + } + } + struct.set_executed_isSet(true); + { + org.apache.thrift.protocol.TMap _map208 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32()); + struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map208.size); + String _key209; + Map<GlobalStreamId,Double> _val210; + for (int _i211 = 0; _i211 < _map208.size; ++_i211) + { + _key209 = iprot.readString(); + { + org.apache.thrift.protocol.TMap _map212 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32()); + _val210 = new HashMap<GlobalStreamId,Double>(2*_map212.size); + GlobalStreamId _key213; + double _val214; + for (int _i215 = 0; _i215 < _map212.size; ++_i215) + { + _key213 = new GlobalStreamId(); + _key213.read(iprot); + _val214 = iprot.readDouble(); + _val210.put(_key213, _val214); + } + } + struct.execute_ms_avg.put(_key209, _val210); } } struct.set_execute_ms_avg_isSet(true); http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java index 5292b78..9c42427 100644 --- a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java +++ b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java @@ -664,14 +664,14 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C case 1: // SUPERVISORS if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { - org.apache.thrift.protocol.TList _list84 = iprot.readListBegin(); - struct.supervisors = new ArrayList<SupervisorSummary>(_list84.size); - SupervisorSummary _elem85; - for (int _i86 = 0; _i86 < _list84.size; ++_i86) + org.apache.thrift.protocol.TList _list92 = iprot.readListBegin(); + struct.supervisors = new ArrayList<SupervisorSummary>(_list92.size); + SupervisorSummary _elem93; + for (int _i94 = 0; _i94 < _list92.size; ++_i94) { - _elem85 = new SupervisorSummary(); - _elem85.read(iprot); - struct.supervisors.add(_elem85); + _elem93 = new SupervisorSummary(); + _elem93.read(iprot); + struct.supervisors.add(_elem93); } iprot.readListEnd(); } @@ -691,14 +691,14 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C case 3: // TOPOLOGIES if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { - org.apache.thrift.protocol.TList _list87 = iprot.readListBegin(); - struct.topologies = new ArrayList<TopologySummary>(_list87.size); - TopologySummary _elem88; - for (int _i89 = 0; _i89 < _list87.size; ++_i89) + org.apache.thrift.protocol.TList _list95 = iprot.readListBegin(); + struct.topologies = new ArrayList<TopologySummary>(_list95.size); + TopologySummary _elem96; + for (int _i97 = 0; _i97 < _list95.size; ++_i97) { - _elem88 = new TopologySummary(); - _elem88.read(iprot); - struct.topologies.add(_elem88); + _elem96 = new TopologySummary(); + _elem96.read(iprot); + struct.topologies.add(_elem96); } iprot.readListEnd(); } @@ -710,14 +710,14 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C case 4: // NIMBUSES if (schemeField.type == org.apache.thrift.protocol.TType.LIST) { { - org.apache.thrift.protocol.TList _list90 = iprot.readListBegin(); - struct.nimbuses = new ArrayList<NimbusSummary>(_list90.size); - NimbusSummary _elem91; - for (int _i92 = 0; _i92 < _list90.size; ++_i92) + org.apache.thrift.protocol.TList _list98 = iprot.readListBegin(); + struct.nimbuses = new ArrayList<NimbusSummary>(_list98.size); + NimbusSummary _elem99; + for (int _i100 = 0; _i100 < _list98.size; ++_i100) { - _elem91 = new NimbusSummary(); - _elem91.read(iprot); - struct.nimbuses.add(_elem91); + _elem99 = new NimbusSummary(); + _elem99.read(iprot); + struct.nimbuses.add(_elem99); } iprot.readListEnd(); } @@ -743,9 +743,9 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C oprot.writeFieldBegin(SUPERVISORS_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.supervisors.size())); - for (SupervisorSummary _iter93 : struct.supervisors) + for (SupervisorSummary _iter101 : struct.supervisors) { - _iter93.write(oprot); + _iter101.write(oprot); } oprot.writeListEnd(); } @@ -760,9 +760,9 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C oprot.writeFieldBegin(TOPOLOGIES_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.topologies.size())); - for (TopologySummary _iter94 : struct.topologies) + for (TopologySummary _iter102 : struct.topologies) { - _iter94.write(oprot); + _iter102.write(oprot); } oprot.writeListEnd(); } @@ -772,9 +772,9 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C oprot.writeFieldBegin(NIMBUSES_FIELD_DESC); { oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, struct.nimbuses.size())); - for (NimbusSummary _iter95 : struct.nimbuses) + for (NimbusSummary _iter103 : struct.nimbuses) { - _iter95.write(oprot); + _iter103.write(oprot); } oprot.writeListEnd(); } @@ -799,23 +799,23 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C TTupleProtocol oprot = (TTupleProtocol) prot; { oprot.writeI32(struct.supervisors.size()); - for (SupervisorSummary _iter96 : struct.supervisors) + for (SupervisorSummary _iter104 : struct.supervisors) { - _iter96.write(oprot); + _iter104.write(oprot); } } { oprot.writeI32(struct.topologies.size()); - for (TopologySummary _iter97 : struct.topologies) + for (TopologySummary _iter105 : struct.topologies) { - _iter97.write(oprot); + _iter105.write(oprot); } } { oprot.writeI32(struct.nimbuses.size()); - for (NimbusSummary _iter98 : struct.nimbuses) + for (NimbusSummary _iter106 : struct.nimbuses) { - _iter98.write(oprot); + _iter106.write(oprot); } } BitSet optionals = new BitSet(); @@ -832,38 +832,38 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C public void read(org.apache.thrift.protocol.TProtocol prot, ClusterSummary struct) throws org.apache.thrift.TException { TTupleProtocol iprot = (TTupleProtocol) prot; { - org.apache.thrift.protocol.TList _list99 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.supervisors = new ArrayList<SupervisorSummary>(_list99.size); - SupervisorSummary _elem100; - for (int _i101 = 0; _i101 < _list99.size; ++_i101) + org.apache.thrift.protocol.TList _list107 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.supervisors = new ArrayList<SupervisorSummary>(_list107.size); + SupervisorSummary _elem108; + for (int _i109 = 0; _i109 < _list107.size; ++_i109) { - _elem100 = new SupervisorSummary(); - _elem100.read(iprot); - struct.supervisors.add(_elem100); + _elem108 = new SupervisorSummary(); + _elem108.read(iprot); + struct.supervisors.add(_elem108); } } struct.set_supervisors_isSet(true); { - org.apache.thrift.protocol.TList _list102 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.topologies = new ArrayList<TopologySummary>(_list102.size); - TopologySummary _elem103; - for (int _i104 = 0; _i104 < _list102.size; ++_i104) + org.apache.thrift.protocol.TList _list110 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.topologies = new ArrayList<TopologySummary>(_list110.size); + TopologySummary _elem111; + for (int _i112 = 0; _i112 < _list110.size; ++_i112) { - _elem103 = new TopologySummary(); - _elem103.read(iprot); - struct.topologies.add(_elem103); + _elem111 = new TopologySummary(); + _elem111.read(iprot); + struct.topologies.add(_elem111); } } struct.set_topologies_isSet(true); { - org.apache.thrift.protocol.TList _list105 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.nimbuses = new ArrayList<NimbusSummary>(_list105.size); - NimbusSummary _elem106; - for (int _i107 = 0; _i107 < _list105.size; ++_i107) + org.apache.thrift.protocol.TList _list113 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.nimbuses = new ArrayList<NimbusSummary>(_list113.size); + NimbusSummary _elem114; + for (int _i115 = 0; _i115 < _list113.size; ++_i115) { - _elem106 = new NimbusSummary(); - _elem106.read(iprot); - struct.nimbuses.add(_elem106); + _elem114 = new NimbusSummary(); + _elem114.read(iprot); + struct.nimbuses.add(_elem114); } } struct.set_nimbuses_isSet(true); http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java b/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java index 0ac0352..a1b7e2e 100644 --- a/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java +++ b/storm-core/src/jvm/backtype/storm/generated/ClusterWorkerHeartbeat.java @@ -635,17 +635,17 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo case 2: // EXECUTOR_STATS if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map600 = iprot.readMapBegin(); - struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map600.size); - ExecutorInfo _key601; - ExecutorStats _val602; - for (int _i603 = 0; _i603 < _map600.size; ++_i603) + org.apache.thrift.protocol.TMap _map608 = iprot.readMapBegin(); + struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map608.size); + ExecutorInfo _key609; + ExecutorStats _val610; + for (int _i611 = 0; _i611 < _map608.size; ++_i611) { - _key601 = new ExecutorInfo(); - _key601.read(iprot); - _val602 = new ExecutorStats(); - _val602.read(iprot); - struct.executor_stats.put(_key601, _val602); + _key609 = new ExecutorInfo(); + _key609.read(iprot); + _val610 = new ExecutorStats(); + _val610.read(iprot); + struct.executor_stats.put(_key609, _val610); } iprot.readMapEnd(); } @@ -692,10 +692,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo oprot.writeFieldBegin(EXECUTOR_STATS_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, struct.executor_stats.size())); - for (Map.Entry<ExecutorInfo, ExecutorStats> _iter604 : struct.executor_stats.entrySet()) + for (Map.Entry<ExecutorInfo, ExecutorStats> _iter612 : struct.executor_stats.entrySet()) { - _iter604.getKey().write(oprot); - _iter604.getValue().write(oprot); + _iter612.getKey().write(oprot); + _iter612.getValue().write(oprot); } oprot.writeMapEnd(); } @@ -727,10 +727,10 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo oprot.writeString(struct.storm_id); { oprot.writeI32(struct.executor_stats.size()); - for (Map.Entry<ExecutorInfo, ExecutorStats> _iter605 : struct.executor_stats.entrySet()) + for (Map.Entry<ExecutorInfo, ExecutorStats> _iter613 : struct.executor_stats.entrySet()) { - _iter605.getKey().write(oprot); - _iter605.getValue().write(oprot); + _iter613.getKey().write(oprot); + _iter613.getValue().write(oprot); } } oprot.writeI32(struct.time_secs); @@ -743,17 +743,17 @@ public class ClusterWorkerHeartbeat implements org.apache.thrift.TBase<ClusterWo struct.storm_id = iprot.readString(); struct.set_storm_id_isSet(true); { - org.apache.thrift.protocol.TMap _map606 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map606.size); - ExecutorInfo _key607; - ExecutorStats _val608; - for (int _i609 = 0; _i609 < _map606.size; ++_i609) + org.apache.thrift.protocol.TMap _map614 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.executor_stats = new HashMap<ExecutorInfo,ExecutorStats>(2*_map614.size); + ExecutorInfo _key615; + ExecutorStats _val616; + for (int _i617 = 0; _i617 < _map614.size; ++_i617) { - _key607 = new ExecutorInfo(); - _key607.read(iprot); - _val608 = new ExecutorStats(); - _val608.read(iprot); - struct.executor_stats.put(_key607, _val608); + _key615 = new ExecutorInfo(); + _key615.read(iprot); + _val616 = new ExecutorStats(); + _val616.read(iprot); + struct.executor_stats.put(_key615, _val616); } } struct.set_executor_stats_isSet(true);