http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java b/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java index 4813fde..c943cac 100644 --- a/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java +++ b/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java @@ -2041,16 +2041,16 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf case 9: // ID_TO_SPOUT_AGG_STATS if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map394 = iprot.readMapBegin(); - struct.id_to_spout_agg_stats = new HashMap<String,ComponentAggregateStats>(2*_map394.size); - String _key395; - ComponentAggregateStats _val396; - for (int _i397 = 0; _i397 < _map394.size; ++_i397) + org.apache.thrift.protocol.TMap _map402 = iprot.readMapBegin(); + struct.id_to_spout_agg_stats = new HashMap<String,ComponentAggregateStats>(2*_map402.size); + String _key403; + ComponentAggregateStats _val404; + for (int _i405 = 0; _i405 < _map402.size; ++_i405) { - _key395 = iprot.readString(); - _val396 = new ComponentAggregateStats(); - _val396.read(iprot); - struct.id_to_spout_agg_stats.put(_key395, _val396); + _key403 = iprot.readString(); + _val404 = new ComponentAggregateStats(); + _val404.read(iprot); + struct.id_to_spout_agg_stats.put(_key403, _val404); } iprot.readMapEnd(); } @@ -2062,16 +2062,16 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf case 10: // ID_TO_BOLT_AGG_STATS if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map398 = iprot.readMapBegin(); - struct.id_to_bolt_agg_stats = new HashMap<String,ComponentAggregateStats>(2*_map398.size); - String _key399; - ComponentAggregateStats _val400; - for (int _i401 = 0; _i401 < _map398.size; ++_i401) + org.apache.thrift.protocol.TMap _map406 = iprot.readMapBegin(); + struct.id_to_bolt_agg_stats = new HashMap<String,ComponentAggregateStats>(2*_map406.size); + String _key407; + ComponentAggregateStats _val408; + for (int _i409 = 0; _i409 < _map406.size; ++_i409) { - _key399 = iprot.readString(); - _val400 = new ComponentAggregateStats(); - _val400.read(iprot); - struct.id_to_bolt_agg_stats.put(_key399, _val400); + _key407 = iprot.readString(); + _val408 = new ComponentAggregateStats(); + _val408.read(iprot); + struct.id_to_bolt_agg_stats.put(_key407, _val408); } iprot.readMapEnd(); } @@ -2234,10 +2234,10 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf oprot.writeFieldBegin(ID_TO_SPOUT_AGG_STATS_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, struct.id_to_spout_agg_stats.size())); - for (Map.Entry<String, ComponentAggregateStats> _iter402 : struct.id_to_spout_agg_stats.entrySet()) + for (Map.Entry<String, ComponentAggregateStats> _iter410 : struct.id_to_spout_agg_stats.entrySet()) { - oprot.writeString(_iter402.getKey()); - _iter402.getValue().write(oprot); + oprot.writeString(_iter410.getKey()); + _iter410.getValue().write(oprot); } oprot.writeMapEnd(); } @@ -2249,10 +2249,10 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf oprot.writeFieldBegin(ID_TO_BOLT_AGG_STATS_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, struct.id_to_bolt_agg_stats.size())); - for (Map.Entry<String, ComponentAggregateStats> _iter403 : struct.id_to_bolt_agg_stats.entrySet()) + for (Map.Entry<String, ComponentAggregateStats> _iter411 : struct.id_to_bolt_agg_stats.entrySet()) { - oprot.writeString(_iter403.getKey()); - _iter403.getValue().write(oprot); + oprot.writeString(_iter411.getKey()); + _iter411.getValue().write(oprot); } oprot.writeMapEnd(); } @@ -2426,20 +2426,20 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf if (struct.is_set_id_to_spout_agg_stats()) { { oprot.writeI32(struct.id_to_spout_agg_stats.size()); - for (Map.Entry<String, ComponentAggregateStats> _iter404 : struct.id_to_spout_agg_stats.entrySet()) + for (Map.Entry<String, ComponentAggregateStats> _iter412 : struct.id_to_spout_agg_stats.entrySet()) { - oprot.writeString(_iter404.getKey()); - _iter404.getValue().write(oprot); + oprot.writeString(_iter412.getKey()); + _iter412.getValue().write(oprot); } } } if (struct.is_set_id_to_bolt_agg_stats()) { { oprot.writeI32(struct.id_to_bolt_agg_stats.size()); - for (Map.Entry<String, ComponentAggregateStats> _iter405 : struct.id_to_bolt_agg_stats.entrySet()) + for (Map.Entry<String, ComponentAggregateStats> _iter413 : struct.id_to_bolt_agg_stats.entrySet()) { - oprot.writeString(_iter405.getKey()); - _iter405.getValue().write(oprot); + oprot.writeString(_iter413.getKey()); + _iter413.getValue().write(oprot); } } } @@ -2514,32 +2514,32 @@ public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInf } if (incoming.get(7)) { { - org.apache.thrift.protocol.TMap _map406 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.id_to_spout_agg_stats = new HashMap<String,ComponentAggregateStats>(2*_map406.size); - String _key407; - ComponentAggregateStats _val408; - for (int _i409 = 0; _i409 < _map406.size; ++_i409) + org.apache.thrift.protocol.TMap _map414 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.id_to_spout_agg_stats = new HashMap<String,ComponentAggregateStats>(2*_map414.size); + String _key415; + ComponentAggregateStats _val416; + for (int _i417 = 0; _i417 < _map414.size; ++_i417) { - _key407 = iprot.readString(); - _val408 = new ComponentAggregateStats(); - _val408.read(iprot); - struct.id_to_spout_agg_stats.put(_key407, _val408); + _key415 = iprot.readString(); + _val416 = new ComponentAggregateStats(); + _val416.read(iprot); + struct.id_to_spout_agg_stats.put(_key415, _val416); } } struct.set_id_to_spout_agg_stats_isSet(true); } if (incoming.get(8)) { { - org.apache.thrift.protocol.TMap _map410 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); - struct.id_to_bolt_agg_stats = new HashMap<String,ComponentAggregateStats>(2*_map410.size); - String _key411; - ComponentAggregateStats _val412; - for (int _i413 = 0; _i413 < _map410.size; ++_i413) + org.apache.thrift.protocol.TMap _map418 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32()); + struct.id_to_bolt_agg_stats = new HashMap<String,ComponentAggregateStats>(2*_map418.size); + String _key419; + ComponentAggregateStats _val420; + for (int _i421 = 0; _i421 < _map418.size; ++_i421) { - _key411 = iprot.readString(); - _val412 = new ComponentAggregateStats(); - _val412.read(iprot); - struct.id_to_bolt_agg_stats.put(_key411, _val412); + _key419 = iprot.readString(); + _val420 = new ComponentAggregateStats(); + _val420.read(iprot); + struct.id_to_bolt_agg_stats.put(_key419, _val420); } } struct.set_id_to_bolt_agg_stats_isSet(true);
http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java b/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java index 99f3922..aa598e4 100644 --- a/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java +++ b/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java @@ -737,15 +737,15 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top case 1: // WINDOW_TO_EMITTED if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map344 = iprot.readMapBegin(); - struct.window_to_emitted = new HashMap<String,Long>(2*_map344.size); - String _key345; - long _val346; - for (int _i347 = 0; _i347 < _map344.size; ++_i347) + org.apache.thrift.protocol.TMap _map352 = iprot.readMapBegin(); + struct.window_to_emitted = new HashMap<String,Long>(2*_map352.size); + String _key353; + long _val354; + for (int _i355 = 0; _i355 < _map352.size; ++_i355) { - _key345 = iprot.readString(); - _val346 = iprot.readI64(); - struct.window_to_emitted.put(_key345, _val346); + _key353 = iprot.readString(); + _val354 = iprot.readI64(); + struct.window_to_emitted.put(_key353, _val354); } iprot.readMapEnd(); } @@ -757,15 +757,15 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top case 2: // WINDOW_TO_TRANSFERRED if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map348 = iprot.readMapBegin(); - struct.window_to_transferred = new HashMap<String,Long>(2*_map348.size); - String _key349; - long _val350; - for (int _i351 = 0; _i351 < _map348.size; ++_i351) + org.apache.thrift.protocol.TMap _map356 = iprot.readMapBegin(); + struct.window_to_transferred = new HashMap<String,Long>(2*_map356.size); + String _key357; + long _val358; + for (int _i359 = 0; _i359 < _map356.size; ++_i359) { - _key349 = iprot.readString(); - _val350 = iprot.readI64(); - struct.window_to_transferred.put(_key349, _val350); + _key357 = iprot.readString(); + _val358 = iprot.readI64(); + struct.window_to_transferred.put(_key357, _val358); } iprot.readMapEnd(); } @@ -777,15 +777,15 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top case 3: // WINDOW_TO_COMPLETE_LATENCIES_MS if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map352 = iprot.readMapBegin(); - struct.window_to_complete_latencies_ms = new HashMap<String,Double>(2*_map352.size); - String _key353; - double _val354; - for (int _i355 = 0; _i355 < _map352.size; ++_i355) + org.apache.thrift.protocol.TMap _map360 = iprot.readMapBegin(); + struct.window_to_complete_latencies_ms = new HashMap<String,Double>(2*_map360.size); + String _key361; + double _val362; + for (int _i363 = 0; _i363 < _map360.size; ++_i363) { - _key353 = iprot.readString(); - _val354 = iprot.readDouble(); - struct.window_to_complete_latencies_ms.put(_key353, _val354); + _key361 = iprot.readString(); + _val362 = iprot.readDouble(); + struct.window_to_complete_latencies_ms.put(_key361, _val362); } iprot.readMapEnd(); } @@ -797,15 +797,15 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top case 4: // WINDOW_TO_ACKED if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map356 = iprot.readMapBegin(); - struct.window_to_acked = new HashMap<String,Long>(2*_map356.size); - String _key357; - long _val358; - for (int _i359 = 0; _i359 < _map356.size; ++_i359) + org.apache.thrift.protocol.TMap _map364 = iprot.readMapBegin(); + struct.window_to_acked = new HashMap<String,Long>(2*_map364.size); + String _key365; + long _val366; + for (int _i367 = 0; _i367 < _map364.size; ++_i367) { - _key357 = iprot.readString(); - _val358 = iprot.readI64(); - struct.window_to_acked.put(_key357, _val358); + _key365 = iprot.readString(); + _val366 = iprot.readI64(); + struct.window_to_acked.put(_key365, _val366); } iprot.readMapEnd(); } @@ -817,15 +817,15 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top case 5: // WINDOW_TO_FAILED if (schemeField.type == org.apache.thrift.protocol.TType.MAP) { { - org.apache.thrift.protocol.TMap _map360 = iprot.readMapBegin(); - struct.window_to_failed = new HashMap<String,Long>(2*_map360.size); - String _key361; - long _val362; - for (int _i363 = 0; _i363 < _map360.size; ++_i363) + org.apache.thrift.protocol.TMap _map368 = iprot.readMapBegin(); + struct.window_to_failed = new HashMap<String,Long>(2*_map368.size); + String _key369; + long _val370; + for (int _i371 = 0; _i371 < _map368.size; ++_i371) { - _key361 = iprot.readString(); - _val362 = iprot.readI64(); - struct.window_to_failed.put(_key361, _val362); + _key369 = iprot.readString(); + _val370 = iprot.readI64(); + struct.window_to_failed.put(_key369, _val370); } iprot.readMapEnd(); } @@ -852,10 +852,10 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top oprot.writeFieldBegin(WINDOW_TO_EMITTED_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, struct.window_to_emitted.size())); - for (Map.Entry<String, Long> _iter364 : struct.window_to_emitted.entrySet()) + for (Map.Entry<String, Long> _iter372 : struct.window_to_emitted.entrySet()) { - oprot.writeString(_iter364.getKey()); - oprot.writeI64(_iter364.getValue()); + oprot.writeString(_iter372.getKey()); + oprot.writeI64(_iter372.getValue()); } oprot.writeMapEnd(); } @@ -867,10 +867,10 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top oprot.writeFieldBegin(WINDOW_TO_TRANSFERRED_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, struct.window_to_transferred.size())); - for (Map.Entry<String, Long> _iter365 : struct.window_to_transferred.entrySet()) + for (Map.Entry<String, Long> _iter373 : struct.window_to_transferred.entrySet()) { - oprot.writeString(_iter365.getKey()); - oprot.writeI64(_iter365.getValue()); + oprot.writeString(_iter373.getKey()); + oprot.writeI64(_iter373.getValue()); } oprot.writeMapEnd(); } @@ -882,10 +882,10 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top oprot.writeFieldBegin(WINDOW_TO_COMPLETE_LATENCIES_MS_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.DOUBLE, struct.window_to_complete_latencies_ms.size())); - for (Map.Entry<String, Double> _iter366 : struct.window_to_complete_latencies_ms.entrySet()) + for (Map.Entry<String, Double> _iter374 : struct.window_to_complete_latencies_ms.entrySet()) { - oprot.writeString(_iter366.getKey()); - oprot.writeDouble(_iter366.getValue()); + oprot.writeString(_iter374.getKey()); + oprot.writeDouble(_iter374.getValue()); } oprot.writeMapEnd(); } @@ -897,10 +897,10 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top oprot.writeFieldBegin(WINDOW_TO_ACKED_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, struct.window_to_acked.size())); - for (Map.Entry<String, Long> _iter367 : struct.window_to_acked.entrySet()) + for (Map.Entry<String, Long> _iter375 : struct.window_to_acked.entrySet()) { - oprot.writeString(_iter367.getKey()); - oprot.writeI64(_iter367.getValue()); + oprot.writeString(_iter375.getKey()); + oprot.writeI64(_iter375.getValue()); } oprot.writeMapEnd(); } @@ -912,10 +912,10 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top oprot.writeFieldBegin(WINDOW_TO_FAILED_FIELD_DESC); { oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, struct.window_to_failed.size())); - for (Map.Entry<String, Long> _iter368 : struct.window_to_failed.entrySet()) + for (Map.Entry<String, Long> _iter376 : struct.window_to_failed.entrySet()) { - oprot.writeString(_iter368.getKey()); - oprot.writeI64(_iter368.getValue()); + oprot.writeString(_iter376.getKey()); + oprot.writeI64(_iter376.getValue()); } oprot.writeMapEnd(); } @@ -959,50 +959,50 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top if (struct.is_set_window_to_emitted()) { { oprot.writeI32(struct.window_to_emitted.size()); - for (Map.Entry<String, Long> _iter369 : struct.window_to_emitted.entrySet()) + for (Map.Entry<String, Long> _iter377 : struct.window_to_emitted.entrySet()) { - oprot.writeString(_iter369.getKey()); - oprot.writeI64(_iter369.getValue()); + oprot.writeString(_iter377.getKey()); + oprot.writeI64(_iter377.getValue()); } } } if (struct.is_set_window_to_transferred()) { { oprot.writeI32(struct.window_to_transferred.size()); - for (Map.Entry<String, Long> _iter370 : struct.window_to_transferred.entrySet()) + for (Map.Entry<String, Long> _iter378 : struct.window_to_transferred.entrySet()) { - oprot.writeString(_iter370.getKey()); - oprot.writeI64(_iter370.getValue()); + oprot.writeString(_iter378.getKey()); + oprot.writeI64(_iter378.getValue()); } } } if (struct.is_set_window_to_complete_latencies_ms()) { { oprot.writeI32(struct.window_to_complete_latencies_ms.size()); - for (Map.Entry<String, Double> _iter371 : struct.window_to_complete_latencies_ms.entrySet()) + for (Map.Entry<String, Double> _iter379 : struct.window_to_complete_latencies_ms.entrySet()) { - oprot.writeString(_iter371.getKey()); - oprot.writeDouble(_iter371.getValue()); + oprot.writeString(_iter379.getKey()); + oprot.writeDouble(_iter379.getValue()); } } } if (struct.is_set_window_to_acked()) { { oprot.writeI32(struct.window_to_acked.size()); - for (Map.Entry<String, Long> _iter372 : struct.window_to_acked.entrySet()) + for (Map.Entry<String, Long> _iter380 : struct.window_to_acked.entrySet()) { - oprot.writeString(_iter372.getKey()); - oprot.writeI64(_iter372.getValue()); + oprot.writeString(_iter380.getKey()); + oprot.writeI64(_iter380.getValue()); } } } if (struct.is_set_window_to_failed()) { { oprot.writeI32(struct.window_to_failed.size()); - for (Map.Entry<String, Long> _iter373 : struct.window_to_failed.entrySet()) + for (Map.Entry<String, Long> _iter381 : struct.window_to_failed.entrySet()) { - oprot.writeString(_iter373.getKey()); - oprot.writeI64(_iter373.getValue()); + oprot.writeString(_iter381.getKey()); + oprot.writeI64(_iter381.getValue()); } } } @@ -1014,75 +1014,75 @@ public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, Top BitSet incoming = iprot.readBitSet(5); if (incoming.get(0)) { { - org.apache.thrift.protocol.TMap _map374 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); - struct.window_to_emitted = new HashMap<String,Long>(2*_map374.size); - String _key375; - long _val376; - for (int _i377 = 0; _i377 < _map374.size; ++_i377) + org.apache.thrift.protocol.TMap _map382 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); + struct.window_to_emitted = new HashMap<String,Long>(2*_map382.size); + String _key383; + long _val384; + for (int _i385 = 0; _i385 < _map382.size; ++_i385) { - _key375 = iprot.readString(); - _val376 = iprot.readI64(); - struct.window_to_emitted.put(_key375, _val376); + _key383 = iprot.readString(); + _val384 = iprot.readI64(); + struct.window_to_emitted.put(_key383, _val384); } } struct.set_window_to_emitted_isSet(true); } if (incoming.get(1)) { { - org.apache.thrift.protocol.TMap _map378 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); - struct.window_to_transferred = new HashMap<String,Long>(2*_map378.size); - String _key379; - long _val380; - for (int _i381 = 0; _i381 < _map378.size; ++_i381) + org.apache.thrift.protocol.TMap _map386 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); + struct.window_to_transferred = new HashMap<String,Long>(2*_map386.size); + String _key387; + long _val388; + for (int _i389 = 0; _i389 < _map386.size; ++_i389) { - _key379 = iprot.readString(); - _val380 = iprot.readI64(); - struct.window_to_transferred.put(_key379, _val380); + _key387 = iprot.readString(); + _val388 = iprot.readI64(); + struct.window_to_transferred.put(_key387, _val388); } } struct.set_window_to_transferred_isSet(true); } if (incoming.get(2)) { { - org.apache.thrift.protocol.TMap _map382 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32()); - struct.window_to_complete_latencies_ms = new HashMap<String,Double>(2*_map382.size); - String _key383; - double _val384; - for (int _i385 = 0; _i385 < _map382.size; ++_i385) + org.apache.thrift.protocol.TMap _map390 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32()); + struct.window_to_complete_latencies_ms = new HashMap<String,Double>(2*_map390.size); + String _key391; + double _val392; + for (int _i393 = 0; _i393 < _map390.size; ++_i393) { - _key383 = iprot.readString(); - _val384 = iprot.readDouble(); - struct.window_to_complete_latencies_ms.put(_key383, _val384); + _key391 = iprot.readString(); + _val392 = iprot.readDouble(); + struct.window_to_complete_latencies_ms.put(_key391, _val392); } } struct.set_window_to_complete_latencies_ms_isSet(true); } if (incoming.get(3)) { { - org.apache.thrift.protocol.TMap _map386 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); - struct.window_to_acked = new HashMap<String,Long>(2*_map386.size); - String _key387; - long _val388; - for (int _i389 = 0; _i389 < _map386.size; ++_i389) + org.apache.thrift.protocol.TMap _map394 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); + struct.window_to_acked = new HashMap<String,Long>(2*_map394.size); + String _key395; + long _val396; + for (int _i397 = 0; _i397 < _map394.size; ++_i397) { - _key387 = iprot.readString(); - _val388 = iprot.readI64(); - struct.window_to_acked.put(_key387, _val388); + _key395 = iprot.readString(); + _val396 = iprot.readI64(); + struct.window_to_acked.put(_key395, _val396); } } struct.set_window_to_acked_isSet(true); } if (incoming.get(4)) { { - org.apache.thrift.protocol.TMap _map390 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); - struct.window_to_failed = new HashMap<String,Long>(2*_map390.size); - String _key391; - long _val392; - for (int _i393 = 0; _i393 < _map390.size; ++_i393) + org.apache.thrift.protocol.TMap _map398 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.I64, iprot.readI32()); + struct.window_to_failed = new HashMap<String,Long>(2*_map398.size); + String _key399; + long _val400; + for (int _i401 = 0; _i401 < _map398.size; ++_i401) { - _key391 = iprot.readString(); - _val392 = iprot.readI64(); - struct.window_to_failed.put(_key391, _val392); + _key399 = iprot.readString(); + _val400 = iprot.readI64(); + struct.window_to_failed.put(_key399, _val400); } } struct.set_window_to_failed_isSet(true); http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/jvm/backtype/storm/hooks/BaseWorkerHook.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/hooks/BaseWorkerHook.java b/storm-core/src/jvm/backtype/storm/hooks/BaseWorkerHook.java new file mode 100644 index 0000000..6fe9f19 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/hooks/BaseWorkerHook.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.hooks; + +import backtype.storm.task.WorkerTopologyContext; + +import java.util.List; +import java.util.Map; + +public class BaseWorkerHook implements IWorkerHook { + @Override + public void start(Map stormConf, WorkerTopologyContext context, List taskIds) { + + } + + @Override + public void shutdown() { + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/jvm/backtype/storm/hooks/IWorkerHook.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/hooks/IWorkerHook.java b/storm-core/src/jvm/backtype/storm/hooks/IWorkerHook.java new file mode 100644 index 0000000..6c2bab2 --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/hooks/IWorkerHook.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.hooks; + +import backtype.storm.task.WorkerTopologyContext; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +public interface IWorkerHook extends Serializable { + void start(Map stormConf, WorkerTopologyContext context, List taskIds); + void shutdown(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java index 9536faa..965540e 100644 --- a/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java +++ b/storm-core/src/jvm/backtype/storm/topology/TopologyBuilder.java @@ -18,25 +18,20 @@ package backtype.storm.topology; import backtype.storm.Config; -import backtype.storm.generated.Bolt; -import backtype.storm.generated.ComponentCommon; -import backtype.storm.generated.ComponentObject; -import backtype.storm.generated.GlobalStreamId; -import backtype.storm.generated.Grouping; -import backtype.storm.generated.NullStruct; -import backtype.storm.generated.SpoutSpec; -import backtype.storm.generated.StateSpoutSpec; -import backtype.storm.generated.StormTopology; +import backtype.storm.generated.*; import backtype.storm.grouping.CustomStreamGrouping; import backtype.storm.grouping.PartialKeyGrouping; +import backtype.storm.hooks.IWorkerHook; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; +import org.json.simple.JSONValue; + +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; - import backtype.storm.windowing.TupleWindow; -import org.json.simple.JSONValue; /** * TopologyBuilder exposes the Java API for specifying a topology for Storm @@ -98,11 +93,13 @@ public class TopologyBuilder { // private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>(); private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<>(); - - + private List<ByteBuffer> _workerHooks = new ArrayList<>(); + + public StormTopology createTopology() { Map<String, Bolt> boltSpecs = new HashMap<>(); Map<String, SpoutSpec> spoutSpecs = new HashMap<>(); + for(String boltId: _bolts.keySet()) { IRichBolt bolt = _bolts.get(boltId); ComponentCommon common = getComponentCommon(boltId, bolt); @@ -112,11 +109,15 @@ public class TopologyBuilder { IRichSpout spout = _spouts.get(spoutId); ComponentCommon common = getComponentCommon(spoutId, spout); spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common)); - } - return new StormTopology(spoutSpecs, - boltSpecs, - new HashMap<String, StateSpoutSpec>()); + + StormTopology stormTopology = new StormTopology(spoutSpecs, + boltSpecs, + new HashMap<String, StateSpoutSpec>()); + + stormTopology.set_worker_hooks(_workerHooks); + + return stormTopology; } /** @@ -230,6 +231,14 @@ public class TopologyBuilder { // TODO: finish } + /** + * Add a new worker lifecycle hook + * + * @param workerHook the lifecycle hook to add + */ + public void addWorkerHook(IWorkerHook workerHook) { + _workerHooks.add(ByteBuffer.wrap(Utils.javaSerialize(workerHook))); + } private void validateUnusedId(String id) { if(_bolts.containsKey(id)) { http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java b/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java index 8306d9b..d5c460f 100644 --- a/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java +++ b/storm-core/src/jvm/backtype/storm/utils/ThriftTopologyUtils.java @@ -27,30 +27,38 @@ import java.util.Map; import java.util.Set; public class ThriftTopologyUtils { + public static boolean isWorkerHook(StormTopology._Fields f) { + return f.equals(StormTopology._Fields.WORKER_HOOKS); + } + public static Set<String> getComponentIds(StormTopology topology) { Set<String> ret = new HashSet<String>(); for(StormTopology._Fields f: StormTopology.metaDataMap.keySet()) { - Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f); - ret.addAll(componentMap.keySet()); + if(StormTopology.metaDataMap.get(f).valueMetaData.type == org.apache.thrift.protocol.TType.MAP) { + Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f); + ret.addAll(componentMap.keySet()); + } } return ret; } public static ComponentCommon getComponentCommon(StormTopology topology, String componentId) { for(StormTopology._Fields f: StormTopology.metaDataMap.keySet()) { - Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f); - if(componentMap.containsKey(componentId)) { - Object component = componentMap.get(componentId); - if(component instanceof Bolt) { - return ((Bolt) component).get_common(); - } - if(component instanceof SpoutSpec) { - return ((SpoutSpec) component).get_common(); - } - if(component instanceof StateSpoutSpec) { - return ((StateSpoutSpec) component).get_common(); + if(StormTopology.metaDataMap.get(f).valueMetaData.type == org.apache.thrift.protocol.TType.MAP) { + Map<String, Object> componentMap = (Map<String, Object>) topology.getFieldValue(f); + if(componentMap.containsKey(componentId)) { + Object component = componentMap.get(componentId); + if(component instanceof Bolt) { + return ((Bolt) component).get_common(); + } + if(component instanceof SpoutSpec) { + return ((SpoutSpec) component).get_common(); + } + if(component instanceof StateSpoutSpec) { + return ((StateSpoutSpec) component).get_common(); + } + throw new RuntimeException("Unreachable code! No get_common conversion for component " + component); } - throw new RuntimeException("Unreachable code! No get_common conversion for component " + component); } } throw new IllegalArgumentException("Could not find component common for " + componentId); http://git-wip-us.apache.org/repos/asf/storm/blob/b03ce6b2/storm-core/src/py/storm/Nimbus.py ---------------------------------------------------------------------- diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py index c1e1b02..c2bb9ac 100644 --- a/storm-core/src/py/storm/Nimbus.py +++ b/storm-core/src/py/storm/Nimbus.py @@ -3811,11 +3811,11 @@ class getComponentPendingProfileActions_result: if fid == 0: if ftype == TType.LIST: self.success = [] - (_etype641, _size638) = iprot.readListBegin() - for _i642 in xrange(_size638): - _elem643 = ProfileRequest() - _elem643.read(iprot) - self.success.append(_elem643) + (_etype648, _size645) = iprot.readListBegin() + for _i649 in xrange(_size645): + _elem650 = ProfileRequest() + _elem650.read(iprot) + self.success.append(_elem650) iprot.readListEnd() else: iprot.skip(ftype) @@ -3832,8 +3832,8 @@ class getComponentPendingProfileActions_result: if self.success is not None: oprot.writeFieldBegin('success', TType.LIST, 0) oprot.writeListBegin(TType.STRUCT, len(self.success)) - for iter644 in self.success: - iter644.write(oprot) + for iter651 in self.success: + iter651.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop()