Repository: airavata Updated Branches: refs/heads/develop cdce06d96 -> a82e34ec0
http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/simstream/datacollector.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/simstream/datacollector.py b/sandbox/simstream/simstream/datacollector.py new file mode 100755 index 0000000..f7f99c1 --- /dev/null +++ b/sandbox/simstream/simstream/datacollector.py @@ -0,0 +1,110 @@ +""" +Utilties for collecting system data. + +Author: Jeff Kinnison ([email protected]) +""" + +from .pikaproducer import PikaProducer + +from threading import Thread, Lock, Event + +import copy + +# TODO: Refactor into subclass of Thread + +class DataCollector(Thread): + """Collects data by running user-specified routines. + + Inherits from: threading.Thread + + Instance variables: + name -- the name of the collector + limit -- the maximum number of maintained data points + interval -- the interval (in seconds) at which data collection is performed + + Public methods: + activate -- start collecting data + add_routing_key -- add a new streaming endpoint + deactivate -- stop further data collection + remove_routing_key -- remove a streaming endpoint + run -- collect data if active + """ + def __init__(self, name, callback, rabbitmq_url, exchange, exchange_type="direct", limit=250, interval=10, + postprocessor=None, callback_args=[], postprocessor_args=[]): + """ + Arguments: + name -- the name of the collector + callback -- the data collection function to run + + Keyword arguments: + limit -- the maximum number of maintained data points (default 250) + interval -- the time interval in seconds at which to collect data + (default: 10) + postprocessor -- a function to run on the return value of callback + (default None) + callback_args -- the list of arguments to pass to the callback + (default []) + postprocessor_args -- the list of arguments to pass to the + postprocessor (default []) + """ + super(DataCollector, self).__init__() + self.name = name if name else "Unknown Resource" + self.limit = limit + self.interval = interval + self._callback = callback + self._callback_args = callback_args + self._postprocessor = postprocessor + self._postprocessor_args = postprocessor_args + self._data = [] + self._data_lock = Lock() + self._active = False + self._producer = PikaProducer(rabbitmq_url, exchange, exchange_type=exchange_type, routing_keys=[]) + + def activate(self): + """ + Start collecting data. + """ + self._active = True + + def add_routing_key(self, key): + """ + Add a new producer endpoint. + """ + self._producer.add_routing_key(key) + + + def deactivate(self): + """ + Stop collecting data. + """ + self._active = False + + def remove_routing_key(self, key): + self._producer.remove_routing_key(key) + if len(self._producer.endpoints) == 0: + self._producer.shutdown() + + def run(self): + """ + Run the callback and postprocessing subroutines and record result. + + Catches generic exceptions because the function being run is not + known beforehand. + """ + self._collection_event = Event() + while self._active and not self._collection_event.wait(timeout=self.interval): + try: + result = self._callback(*self._callback_args) + result = self._postprocessor(result, *self._postprocessor_args) if self._postprocessor else result + #print("Found the value ", result, " in ", self.name) + self._data.append(result) + if len(self._data) > self.limit: + self._data.pop(0) + self._producer(copy.copy(self._data)) + + except Exception as e: + print("[ERROR] %s" % (e)) + + def stop(self): + for key in self.producer.routing_keys: + self.remove_routing_key(key) http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/simstream/datareporter.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/simstream/datareporter.py b/sandbox/simstream/simstream/datareporter.py new file mode 100755 index 0000000..156cc08 --- /dev/null +++ b/sandbox/simstream/simstream/datareporter.py @@ -0,0 +1,169 @@ +""" +Utilties for collecting system data. + +Author: Jeff Kinnison ([email protected]) +""" + +# TODO: Refactor to iterate over producers, not collectors. Collectors should +# execute concurrently. +# TODO: Add method to deactivate reporter + +from threading import Thread, Event + +from .datacollector import DataCollector + + +class CollectorExistsException(Exception): + """Thrown when attempting to add a collector with a conflicting name.""" + pass + + +class CollectorDoesNotExistException(Exception): + """Thrown when attempting to access a collector that does not exist.""" + pass + + +class DataReporter(object): + """Manages collecting specified data. + + Subclass of threading.Thread that modifies Thread.join() and Thread.run() + + Instance variables: + collectors -- a dict of DataCollectors that are run at interval + + Public methods: + add_collector -- add a new DataCollector to the list + run -- start the data collection loop + join -- end data collection and return control to main thread + start_collecting -- begin data collection for all collectors + start_collector -- begin data collection for a specific collector + stop_collecting -- stop all data collection + stop_collector -- stop a running DataCollector + """ + + def __init__(self, collectors={}): + super(DataReporter, self).__init__() + self.collectors = {} + for key, value in collectors: + self.add_collector( + key, + value.limit, + value.callback, + value.url, + value.exchange, + value.postprocessor, + value.callback_args, + value.postprocessor_args + ) + + def add_collector(self, name, callback, rabbitmq_url, exchange, limit=250, interval=10, postprocessor=None, + exchange_type="direct", callback_args=[], postprocessor_args=[]): + """Add a new collector. + + Arguments: + name -- name of the new DataCollector + callback -- the data collection callback to run + + Keyword arguments: + limit -- the number of data points to store (default 100) + postprocessor -- a postprocessing function to run on each data point + (default None) + callback_args -- a list of arguments to pass to the callback + (default []) + postprocessor_args -- a list of arguments to pass to the postprocessor + (default []) + + Raises: + CollectorExistsException if a collector named name already exists + """ + if name in self.collectors: + raise CollectorExistsException + + self.collectors[name] = DataCollector( + name, + callback, + rabbitmq_url, + exchange, + limit=limit, + interval=interval, + postprocessor=postprocessor, + exchange_type=exchange_type, + callback_args=callback_args, + postprocessor_args=postprocessor_args + ) + + def start_collecting(self): + """ + Start data collection for all associated collectors. + """ + for collector in self.collectors: + self.start_collector(collector) + + def start_collector(self, name): + """ + Activate the specified collector. + + Arguments: + name -- the name of the collector to start + + Raises: + RuntimeError if the collector has already been started. + """ + try: + self.collectors[name].activate() + self.collectors[name].start() + except RuntimeError as e: + print("Error starting collector ", name) + print(e) + + def stop_collecting(self): + """ + Stop all collectors. + """ + for collector in self.collectors: + self.stop_collector(collector) + + def stop_collector(self, name): + """Deactivate the specified collector. + + Arguments: + name -- the name of the collector to stop + + Raises: + CollectorDoesNotExistException if no collector named name exists + """ + if name not in self.collectors: + raise CollectorDoesNotExistException + + try: + self.collectors[name].deactivate() + self.collectors[name].join() + except RuntimeError as e: # Catch deadlock + print(e) + + + def start_streaming(self, collector_name, routing_key): + """ + Begin streaming data from a collector to a particular recipient. + + Arguments: + routing_key -- the routing key to reach the intended recipient + """ + if collector_name not in self.collectors: # Make sure collector exists + raise CollectorDoesNotExistException + self.collectors[collector_name].add_routing_key(routing_key) + + def stop_streaming(self, collector_name, routing_key): + """ + Stop a particular stream. + + Arguments: + collector_name -- the collector associated with the producer to stop + routing_key -- the routing key to reach the intended recipient + + Raises: + ProducerDoesNotExistException if no producer named name exists + ValueError if the producer is removed by another call to this method + after the for loop begins + """ + pass http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/simstream/eventhandler.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/simstream/eventhandler.py b/sandbox/simstream/simstream/eventhandler.py new file mode 100755 index 0000000..9f4f3f2 --- /dev/null +++ b/sandbox/simstream/simstream/eventhandler.py @@ -0,0 +1,17 @@ +""" +A utility for responding to user-defined events. + +Author: Jeff Kinnison (jkinniso) +""" + + +class EventHandler(object): + """ + """ + def __init__(self, name, handler, handler_args=[]): + self.name = name + self._handler = handler + self._handler_args + + def __call__(self): + pass http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/simstream/eventmonitor.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/simstream/eventmonitor.py b/sandbox/simstream/simstream/eventmonitor.py new file mode 100755 index 0000000..d8c79f4 --- /dev/null +++ b/sandbox/simstream/simstream/eventmonitor.py @@ -0,0 +1,46 @@ +""" +Utility for monitoring collected data. + +Author: Jeff Kinnison ([email protected]) +""" + +# TODO: Add method to add handlers +# TODO: Add method to create PikaProducer +# TODO: Add method to use PikaProducer to respond to events +# TODO: Add method to deactivate monitor + + +class EventCheckerNotCallableException(Exception): + pass + + +class EventHandlerNotCallableException(Exception): + pass + + +class EventHandlerDoesNotExistException(Exception): + pass + + +class EventMonitor(object): + """Checks data for user-defined bounds violations. + + Instance variables: + handlers -- a dict of EventHandler objects indexed by name + """ + def __init__(self, event_check, handlers={}): + self._event_check = event_check + self.handlers = handlers + + def __call__(self, val): + if not callable(self._event_check): + raise EventCheckerNotCallableException + self._run_handler(self.event_check(val)) + + def _run_handler(self, handler_names): + for name in handler_names: + if name not in self.handlers: + raise EventHandlerDoesNotExistException + if not callable(self.handlers[name]): + raise EventHandlerNotCallableException + self.handlers[name]() http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/simstream/pikaasyncconsumer.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/simstream/pikaasyncconsumer.py b/sandbox/simstream/simstream/pikaasyncconsumer.py new file mode 100755 index 0000000..1c58687 --- /dev/null +++ b/sandbox/simstream/simstream/pikaasyncconsumer.py @@ -0,0 +1,203 @@ +""" +Streaming utility for system and simulation data. + +author: Jeff Kinnison ([email protected]) +""" + +import json +import pika + +class PikaAsyncConsumer(object): + """ + The primary entry point for routing incoming messages to the proper handler. + """ + + def __init__(self, rabbitmq_url, exchange_name, queue_name, message_handler, + exchange_type="direct", routing_key="#"): + """ + Create a new instance of Streamer. + + Arguments: + rabbitmq_url -- URL to RabbitMQ server + exchange_name -- name of RabbitMQ exchange to join + queue_name -- name of RabbitMQ queue to join + + Keyword Arguments: + exchange_type -- one of 'direct', 'topic', 'fanout', 'headers' + (default 'direct') + routing_keys -- the routing key that this consumer listens for + (default '#', receives all messages) + """ + self._connection = None + self._channel = None + self._shut_down = False + self._consumer_tag = None + self._url = rabbitmq_url + self._message_handler = message_handler + + # The following are necessary to guarantee that both the RabbitMQ + # server and Streamer know where to look for messages. These names will + # be decided before dispatch and should be recorded in a config file or + # else on a per-job basis. + self._exchange = exchange_name + self._exchange_type = exchange_type + self._queue = queue_name + self._routing_key = routing_key + + def connect(self): + """ + Create an asynchronous connection to the RabbitMQ server at URL. + """ + return pika.SelectConnection(pika.URLParameters(self._url), + on_open_callback=self.on_connection_open, + on_close_callback=self.on_connection_close, + stop_ioloop_on_close=False) + + def on_connection_open(self, unused_connection): + """ + Actions to perform when the connection opens. This may not happen + immediately, so defer action to this callback. + + Arguments: + unused_connection -- the created connection (by this point already + available as self._connection) + """ + self._connection.channel(on_open_callback=self.on_channel_open) + + def on_connection_close(self, connection, code, text): + """ + Actions to perform when the connection is unexpectedly closed by the + RabbitMQ server. + + Arguments: + connection -- the connection that was closed (same as self._connection) + code -- response code from the RabbitMQ server + text -- response body from the RabbitMQ server + """ + self._channel = None + if self._shut_down: + self._connection.ioloop.stop() + else: + self._connection.add_timeout(5, self.reconnect) + + def reconnect(self): + """ + Attempt to reestablish a connection with the RabbitMQ server. + """ + self._connection.ioloop.stop() # Stop the ioloop to completely close + + if not self._shut_down: # Connect and restart the ioloop + self._connection = self.connect() + self._connection.ioloop.start() + + def on_channel_open(self, channel): + """ + Store the opened channel for future use and set up the exchange and + queue to be used. + + Arguments: + channel -- the Channel instance opened by the Channel.Open RPC + """ + self._channel = channel + self._channel.add_on_close_callback(self.on_channel_close) + self.declare_exchange() + + + def on_channel_close(self, channel, code, text): + """ + Actions to perform when the channel is unexpectedly closed by the + RabbitMQ server. + + Arguments: + connection -- the connection that was closed (same as self._connection) + code -- response code from the RabbitMQ server + text -- response body from the RabbitMQ server + """ + self._connection.close() + + def declare_exchange(self): + """ + Set up the exchange that will route messages to this consumer. Each + RabbitMQ exchange is uniquely identified by its name, so it does not + matter if the exchange has already been declared. + """ + self._channel.exchange_declare(self.declare_exchange_success, + self._exchange, + self._exchange_type) + + def declare_exchange_success(self, unused_connection): + """ + Actions to perform on successful exchange declaration. + """ + self.declare_queue() + + def declare_queue(self): + """ + Set up the queue that will route messages to this consumer. Each + RabbitMQ queue can be defined with routing keys to use only one + queue for multiple jobs. + """ + self._channel.queue_declare(self.declare_queue_success, + self._queue) + + def declare_queue_success(self, method_frame): + """ + Actions to perform on successful queue declaration. + """ + self._channel.queue_bind(self.munch, + self._queue, + self._exchange, + self._routing_key + ) + + def munch(self, unused): + """ + Begin consuming messages from the Airavata API server. + """ + self._channel.add_on_cancel_callback(self.cancel_channel) + self._consumer_tag = self._channel.basic_consume(self._process_message) + + def cancel_channel(self, method_frame): + if self._channel is not None: + self._channel._close() + + def _process_message(self, ch, method, properties, body): + """ + Receive and verify a message, then pass it to the router. + + Arguments: + ch -- the channel that routed the message + method -- delivery information + properties -- message properties + body -- the message + """ + print("Received Message: %s" % body) + self._message_handler(body) + #self._channel.basic_ack(delivery_tag=method.delivery_tag) + + def stop_consuming(self): + """ + Stop the consumer if active. + """ + if self._channel: + self._channel.basic_cancel(self.close_channel, self._consumer_tag) + + def close_channel(self): + """ + Close the channel to shut down the consumer and connection. + """ + self._channel.close() + + def start(self): + """ + Start a connection with the RabbitMQ server. + """ + self._connection = self.connect() + self._connection.ioloop.start() + + def stop(self): + """ + Stop an active connection with the RabbitMQ server. + """ + self._closing = True + self.stop_consuming() http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/simstream/pikaproducer.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/simstream/pikaproducer.py b/sandbox/simstream/simstream/pikaproducer.py new file mode 100755 index 0000000..6ffaf3d --- /dev/null +++ b/sandbox/simstream/simstream/pikaproducer.py @@ -0,0 +1,202 @@ +""" +Utilties for sending data. + +Author: Jeff Kinnison ([email protected]) +""" + +import json +import pika + + +class PikaProducer(object): + """ + Utility for sending job data to a set of endpoints. + """ + + def __init__(self, rabbitmq_url, exchange, exchange_type="direct", routing_keys=[]): + """ + Instantiate a new PikaProducer. + + Arguments: + rabbitmq_url -- the url of the RabbitMQ server to send to + exchange -- the name of the exchange to send to + + Keyword Arguments: + exchange_type -- one of one of 'direct', 'topic', 'fanout', 'headers' + (default 'direct') + routing_key -- the routing keys to the endpoints for this producer + (default []) + """ + self._url = rabbitmq_url + self._exchange = exchange + self._exchange_type = exchange_type + self._routing_keys = routing_keys + + self._connection = None # RabbitMQ connection object + self._channel = None # RabbitMQ channel object + + import random + self._name = random.randint(0,100) + + def __call__(self, data): + """ + Publish data to the RabbitMQ server. + + Arguments: + data -- JSON serializable data to send + """ + if self._connection is None: # Start the connection if it is inactive + self.start() + else: # Serialize and send the data + message = self.pack_data(data) + self.send_data(message) + + def add_routing_key(self, key): + """ + Add a new endpoint that will receive this data. + + Arguments: + key -- the routing key for the new endpoint + """ + if key not in self._routing_keys: + #print("Adding key %s to %s" % (key, self._name)) + self._routing_keys.append(key) + #print(self._routing_keys) + + def remove_routing_key(self, key): + """ + Stop sending data to an existing endpoint. + + Arguments: + key -- the routing key for the existing endpoint + """ + try: + self._routing_keys.remove(key) + except ValueError: + pass + + def pack_data(self, data): + """ + JSON-serialize the data for transport. + + Arguments: + data -- JSON-serializable data + """ + try: # Generate a JSON string from the data + msg = json.dumps(data) + except TypeError as e: # Generate and return an error if serialization fails + msg = json.dumps({"err": str(e)}) + finally: + return msg + + def send_data(self, data): + """ + Send the data to all active endpoints. + + Arguments: + data -- the message to send + """ + if self._channel is not None: # Make sure the connection is active + for key in self._routing_keys: # Send to all endpoints + #print(self._exchange, key, self._name) + self._channel.basic_publish(exchange = self._exchange, + routing_key=key, + body=data) + + def start(self): + """ + Open a connection if one does not exist. + """ + print("Starting new connection") + if self._connection is None: + print("Creating connection object") + self._connection = pika.BlockingConnection(pika.URLParameters(self._url)) + self._channel = self._connection.channel() + self._channel.exchange_declare(exchange=self._exchange, + type=self._exchange_type) + + def shutdown(self): + """ + Close an existing connection. + """ + if self._channel is not None: + self._channel.close() + + def _on_connection_open(self, unused_connection): + """ + Create a new channel if the connection opens successful. + + Arguments: + unused_connection -- a reference to self._connection + """ + print("Connection is open") + self._connection.channel(on_open_callback=self._on_channel_open) + + def _on_connection_close(self, connection, code, text): + """ + Actions to take when the connection is closed for any reason. + + Arguments: + connection -- the connection that was closed (same as self._connection) + code -- response code from the RabbitMQ server + text -- response body from the RabbitMQ server + """ + print("Connection is closed") + self._channel = None + self._connection = None + + def _on_channel_open(self, channel): + """ + Actions to take when the channel opens. + + Arguments: + channel -- the newly opened channel + """ + print("Channel is open") + self._channel = channel + self._channel.add_on_close_callback(self._on_channel_close) + self._declare_exchange() + + def _on_channel_close(self, channel, code, text): + """ + Actions to take when the channel closes for any reason. + + Arguments: + channel -- the channel that was closed (same as self._channel) + code -- response code from the RabbitMQ server + text -- response body from the RabbitMQ server + """ + print("Channel is closed") + self._connection.close() + + def _declare_exchange(self): + """ + Set up the exchange to publish to even if it already exists. + """ + print("Exchange is declared") + self._channel.exchange_declare(exchange=self._exchange, + type=self.exchange_type) + +if __name__ == "__main__": + import time + + config = { + "url": "amqp://guest:guest@localhost:5672", + "exchange": "simstream", + "routing_key": "test_consumer", + "exchange_type": "topic" + } + + producer = PikaProducer(config["url"], + config["exchange"], + exchange_type=config["exchange_type"], + routing_keys=[config["routing_key"]]) + producer.start() + + while True: + try: + time.sleep(5) + data = str(time.time()) + ": Hello SimStream" + producer.send_data(data) + except KeyboardInterrupt: + producer.shutdown() http://git-wip-us.apache.org/repos/asf/airavata/blob/a82e34ec/sandbox/simstream/simstream/simstream.py ---------------------------------------------------------------------- diff --git a/sandbox/simstream/simstream/simstream.py b/sandbox/simstream/simstream/simstream.py new file mode 100755 index 0000000..499a8c3 --- /dev/null +++ b/sandbox/simstream/simstream/simstream.py @@ -0,0 +1,167 @@ +import pika + +from .pikaasyncconsumer import PikaAsyncConsumer +from .datacollector import DataCollector +from .datareporter import DataReporter +from .eventhandler import EventHandler +from .eventmonitor import EventMonitor + + +class ReporterExistsException(Exception): + """Thrown when attempting to add a DataReporter with a conflicting name""" + pass + + +class SimStream(object): + """ + Manager for routing messages to their correct reporter. + """ + + DEFAULT_CONFIG_PATH="simstream.cnf" + + + class MessageParser(object): + """ + Internal message parsing facilities. + """ + + def __init__(self): + self.parsed = None + + def __call__(self, message): + pass + + + def __init__(self, reporters={}, config={}): + self.reporters = reporters + self.consumer = None + self.config = config + + def add_data_reporter(self, reporter): + """ + Add a new DataReporter object. + + Arguments: + reporter -- the DataReporter to add + """ + if reporter.name in self.reporters: + raise ReporterExistsException + self.reporters[reporter.name] = reporter + + def parse_config(self): + """ + Read the config file and set up the specified, data collection and + event handling resources. + """ + # TODO: Read in config + # TODO: Set up configuration dict + pass + + def route_message(self, message): + """ + Send a message to the correct reporter. + """ + # TODO: Create new MessageParser + # TODO: Run message through MessageParser + # TODO: Route message to the correct DataReporter/EventMonitor + parser = MessageParser() + parser(message) + if parser.reporter_name in self.reporters: + self.reporters[parser.reporter_name].start_streaming( + parser.collector_name, + parser.routing_key + ) + + def start_collecting(self): + """ + Begin collecting data and monitoring for events. + """ + for reporter in self.reporters: + self.reporters[reporter].start_collecting() + + def setup(self): + """ + Set up the SimStream instance: create DataCollectors, create + EventMonitors, configure AMQP consumer. + """ + self.parse_config() + #self.setup_consumer() + self.setup_data_collection() + self.setup_event_monitoring() + + def setup_data_collection(self): + """ + Set up all DataReporters and DataCollectors. + """ + # TODO: Create and configure all DataReporters + # TODO: Create and configure all DataCollectors + # TODO: Assign each DataCollector to the correct DataReporter + if "reporters" in self.config: + for reporter in self.config.reporters: + pass + for collector in self.config.collectors: + pass + + def setup_event_monitoring(self): + #TODO: Create and configure all EventMonitors + #TODO: Create and configure all EventHandlers + #TODO: Assign each EventHandler to the correct EventMonitor + #TODO: Assign each EventMonitor to the correct DataCollector + pass + + def setup_consumer(self): + """ + Set up and configure the consumer. + """ + if len(self.config) > 0 and self.consumer is None: + if "message_handler" in self.config: + message_handler = self.config["message_handler"] + else: + message_handler = self.route_message + self.consumer = PikaAsyncConsumer(self.config["url"], + self.config["exchange"], + self.config["queue"], + message_handler, + exchange_type=self.config["exchange_type"], + routing_key=self.config["routing_key"] + ) + + def start(self): + """ + Configure and start SimStream. + """ + if self.consumer is None: + self.setup() + self.start_collecting() + #self.consumer.start() + + def stop(self): + """ + Stop all data collection, event monitoring, and message consumption. + """ + self.consumer.stop() + self.stop_collecting() + + +if __name__ == "__main__": + def print_message(message): + with open("test.out", "w") as f: + print(message) + + print(SimStream.DEFAULT_CONFIG_PATH) + + config = { + "url": "amqp://guest:guest@localhost:5672", + "exchange": "simstream", + "queue": "simstream_test", + "message_handler": print_message, + "routing_key": "test_consumer", + "exchange_type": "topic" + } + + streamer = SimStream(config=config) + + try: + streamer.start() + except KeyboardInterrupt: + streamer.stop()
