http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/start.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/start.py b/ambari-common/src/test/python/coilmq/start.py new file mode 100644 index 0000000..f894bab --- /dev/null +++ b/ambari-common/src/test/python/coilmq/start.py @@ -0,0 +1,226 @@ +#!python +""" +Entrypoint for starting the application. +""" +import os +import logging + + +import time +import threading +from contextlib import contextmanager + +is_nt = os.name == 'nt' + +if not is_nt: + import daemon as pydaemon + import pid +else: + pydaemon = pid = None + +import click + +from coilmq.config import config as global_config, init_config, init_logging, resolve_name +from coilmq.protocol import STOMP11 +from coilmq.topic import TopicManager +from coilmq.queue import QueueManager +from coilmq.server.socket_server import ThreadedStompServer + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + +logger = logging.getLogger(__name__) + + +def server_from_config(config=None, server_class=None, additional_kwargs=None): + """ + Gets a configured L{coilmq.server.StompServer} from specified config. + + If `config` is None, global L{coilmq.config.config} var will be used instead. + + The `server_class` and `additional_kwargs` are primarily hooks for using this method + from a testing environment. + + @param config: A C{ConfigParser.ConfigParser} instance with loaded config values. + @type config: C{ConfigParser.ConfigParser} + + @param server_class: Which class to use for the server. (This doesn't come from config currently.) + @type server_class: C{class} + + @param additional_kwargs: Any additional args that should be passed to class. + @type additional_kwargs: C{list} + + @return: The configured StompServer. + @rtype: L{coilmq.server.StompServer} + """ + global global_config + if not config: + config = global_config + + queue_store_factory = resolve_name(config.get('coilmq', 'qstore.factory')) + subscriber_scheduler_factory = resolve_name(config.get( + 'coilmq', 'scheduler.subscriber_priority_factory')) + queue_scheduler_factory = resolve_name(config.get( + 'coilmq', 'scheduler.queue_priority_factory')) + + if config.has_option('coilmq', 'auth.factory'): + authenticator_factory = resolve_name( + config.get('coilmq', 'auth.factory')) + authenticator = authenticator_factory() + else: + authenticator = None + + server = ThreadedStompServer((config.get('coilmq', 'listen_addr'), config.getint('coilmq', 'listen_port')), + queue_manager=QueueManager(store=queue_store_factory(), + subscriber_scheduler=subscriber_scheduler_factory(), + queue_scheduler=queue_scheduler_factory()), + topic_manager=TopicManager(), + authenticator=authenticator, + protocol=STOMP11) + logger.info("Created server:%r" % server) + return server + + +def context_serve(context, configfile, listen_addr, listen_port, logfile, + debug, daemon, uid, gid, pidfile, umask, rundir): + """ + Takes a context object, which implements the __enter__/__exit__ "with" interface + and starts a server within that context. + + This method is a refactored single-place for handling the server-run code whether + running in daemon or non-daemon mode. It is invoked with a dummy (passthrough) + context object for the non-daemon use case. + + @param options: The compiled collection of options that need to be parsed. + @type options: C{ConfigParser} + + @param context: The context object that implements __enter__/__exit__ "with" methods. + @type context: C{object} + + @raise Exception: Any underlying exception will be logged but then re-raised. + @see: server_from_config() + """ + global global_config + + server = None + try: + with context: + # There's a possibility here that init_logging() will throw an exception. If it does, + # AND we're in a daemon context, then we're not going to be able to do anything with it. + # We've got no stderr/stdout here; and so (to my knowledge) no reliable (& cross-platform), + # way to display errors. + level = logging.DEBUG if debug else logging.INFO + init_logging(logfile=logfile, loglevel=level, + configfile=configfile) + + server = server_from_config() + logger.info("Stomp server listening on %s:%s" % server.server_address) + + if debug: + poll_interval = float(global_config.get( + 'coilmq', 'debug.stats_poll_interval')) + if poll_interval: # Setting poll_interval to 0 effectively disables it. + def diagnostic_loop(server): + log = logger + while True: + log.debug( + "Stats heartbeat -------------------------------") + store = server.queue_manager.store + for dest in store.destinations(): + log.debug("Queue %s: size=%s, subscribers=%s" % ( + dest, store.size(dest), server.queue_manager.subscriber_count(dest))) + + # TODO: Add number of subscribers? + + time.sleep(poll_interval) + + diagnostic_thread = threading.Thread( + target=diagnostic_loop, name='DiagnosticThread', args=(server,)) + diagnostic_thread.daemon = True + diagnostic_thread.start() + + server.serve_forever() + + except (KeyboardInterrupt, SystemExit): + logger.info("Stomp server stopped by user interrupt.") + raise SystemExit() + except Exception as e: + logger.error("Stomp server stopped due to error: %s" % e) + logger.exception(e) + raise SystemExit() + finally: + if server: + server.server_close() + + +def _main(config=None, host=None, port=None, logfile=None, debug=None, + daemon=None, uid=None, gid=None, pidfile=None, umask=None, rundir=None): + + # Note that we must initialize the configuration before we enter the context + # block; however, we _cannot_ initialize logging until we are in the context block + # (so we defer that until the context_serve call.) + init_config(config) + + if host is not None: + global_config.set('coilmq', 'listen_addr', host) + + if port is not None: + global_config.set('coilmq', 'listen_port', str(port)) + + if daemon and is_nt: + warnings.warn("Daemon context is not available for NT platform") + + # in an on-daemon mode, we use a dummy context objectx + # so we can use the same run-server code as the daemon version. + context = pydaemon.DaemonContext(uid=uid, + gid=gid, + pidfile=pid.PidFile(pidname=pidfile) if pidfile else None, + umask=int(umask, 8), + working_directory=rundir) if daemon and pydaemon else contextmanager(lambda: (yield))() + + context_serve(context, config, host, port, logfile, debug, daemon, uid, gid, pidfile, umask, rundir) + + [email protected]() [email protected]("-c", "--config", help="Read configuration from FILE. (CLI options override config file.)", metavar="FILE") [email protected]("-b", "--host", help="Listen on specified address (default 127.0.0.1)", metavar="ADDR") [email protected]("-p", "--port", help="Listen on specified port (default 61613)", type=int, metavar="PORT") [email protected]("-l", "--logfile", help="Log to specified file (unless logging configured in config file).", metavar="FILE") [email protected]("--debug", default=False, help="Sets logging to debug (unless logging configured in config file).") [email protected]("-d", "--daemon", default=False, help="Run server as a daemon (default False).") [email protected]("-u", "--uid", help="The user/UID to use for daemon process.", metavar="UID") [email protected]("-g", "--gid", help="The group/GID to use for daemon process.", metavar="GID") [email protected]("--pidfile", help="The PID file to use.", metavar="FILE") [email protected]("--umask", help="Umask (octal) to apply for daemonized process.", metavar="MASK") [email protected]('--rundir', help="The working directory to use for the daemonized process (default /).", metavar="DIR") +def main(config, host, port, logfile, debug, daemon, uid, gid, pidfile, umask, rundir): + """ + Main entry point for running a socket server from the commandline. + + This method will read in options from the commandline and call the L{config.init_config} method + to get everything setup. Then, depending on whether deamon mode was specified or not, + the process may be forked (or not) and the server will be started. + """ + + _main(**locals()) + + +if __name__ == '__main__': + try: + main() + except (KeyboardInterrupt, SystemExit): + pass + except Exception as e: + logger.error("Server terminated due to error: %s" % e) + logger.exception(e)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/store/__init__.py b/ambari-common/src/test/python/coilmq/store/__init__.py new file mode 100644 index 0000000..a0daf9d --- /dev/null +++ b/ambari-common/src/test/python/coilmq/store/__init__.py @@ -0,0 +1,189 @@ +""" +Storage containers for durable queues and (planned) durable topics. +""" +import abc +import logging +import threading + +from coilmq.util.concurrency import synchronized + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + +lock = threading.RLock() + + +class QueueStore(object): + """ + Abstract base class for queue storage. + + Extensions/implementations of this class must be thread-safe. + + @ivar log: A logger for this class. + @type log: C{logging.Logger} + """ + __metaclass__ = abc.ABCMeta + + def __init__(self): + """ + A base constructor that sets up logging. + + If you extend this class, you should either call this method or at minimum make sure these values + get set. + """ + self.log = logging.getLogger('%s.%s' % ( + self.__module__, self.__class__.__name__)) + + @abc.abstractmethod + @synchronized(lock) + def enqueue(self, destination, frame): + """ + Store message (frame) for specified destinationination. + + @param destination: The destinationination queue name for this message (frame). + @type destination: C{str} + + @param frame: The message (frame) to send to specified destinationination. + @type frame: C{stompclient.frame.Frame} + """ + + @abc.abstractmethod + @synchronized(lock) + def dequeue(self, destination): + """ + Removes and returns an item from the queue (or C{None} if no items in queue). + + @param destination: The queue name (destinationination). + @type destination: C{str} + + @return: The first frame in the specified queue, or C{None} if there are none. + @rtype: C{stompclient.frame.Frame} + """ + + @synchronized(lock) + def requeue(self, destination, frame): + """ + Requeue a message (frame) for storing at specified destinationination. + + @param destination: The destinationination queue name for this message (frame). + @type destination: C{str} + + @param frame: The message (frame) to send to specified destinationination. + @type frame: C{stompclient.frame.Frame} + """ + self.enqueue(destination, frame) + + @synchronized(lock) + def size(self, destination): + """ + Size of the queue for specified destination. + + @param destination: The queue destination (e.g. /queue/foo) + @type destination: C{str} + + @return: The number of frames in specified queue. + @rtype: C{int} + """ + raise NotImplementedError() + + @synchronized(lock) + def has_frames(self, destination): + """ + Whether specified destination has any frames. + + Default implementation uses L{QueueStore.size} to determine if there + are any frames in queue. Subclasses may choose to optimize this. + + @param destination: The queue destination (e.g. /queue/foo) + @type destination: C{str} + + @return: The number of frames in specified queue. + @rtype: C{int} + """ + return self.size(destination) > 0 + + @synchronized(lock) + def destinations(self): + """ + Provides a set of destinations (queue "addresses") available. + + @return: A list of the detinations available. + @rtype: C{set} + """ + raise NotImplementedError + + @synchronized(lock) + def close(self): + """ + May be implemented to perform any necessary cleanup operations when store is closed. + """ + pass + + # This is intentionally not synchronized, since it does not directly + # expose any shared data. + def frames(self, destination): + """ + Returns an iterator for frames in specified queue. + + The iterator simply wraps calls to L{dequeue} method, so the order of the + frames from the iterator will be the reverse of the order in which the + frames were enqueued. + + @param destination: The queue destination (e.g. /queue/foo) + @type destination: C{str} + """ + return QueueFrameIterator(self, destination) + + +class QueueFrameIterator(object): + """ + Provides an C{iterable} over the frames for a specified destination in a queue. + + @ivar store: The queue store. + @type store: L{coilmq.store.QueueStore} + + @ivar destination: The destination for this iterator. + @type destination: C{str} + """ + + def __init__(self, store, destination): + self.store = store + self.destination = destination + + def __iter__(self): + return self + + def next(self): + return self.__next__() + + def __next__(self): + frame = self.store.dequeue(self.destination) + if not frame: + raise StopIteration() + return frame + + def __len__(self): + return self.store.size(self.destination) + + +class TopicStore(object): + """ + Abstract base class for non-durable topic storage. + """ + + +class DurableTopicStore(TopicStore): + """ + Abstract base class for durable topic storage. + """ http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/dbm.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/store/dbm.py b/ambari-common/src/test/python/coilmq/store/dbm.py new file mode 100644 index 0000000..ab4fb00 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/store/dbm.py @@ -0,0 +1,262 @@ +""" +Queue storage module that stores the queue information and frames in a DBM-style database. + +The current implementation uses the Python `shelve` module, which uses a DBM implementation +under the hood (specifically the `anydbm` module, aka `dbm` in Python 3.x). + +Because of how the `shelve` module works (and how we're using it) and caveats in the Python +documentation this is likely a BAD storage module to use if you are expecting to traffic in +large frames. +""" +import threading +import logging +import os +import os.path +import shelve +from collections import deque +from datetime import datetime, timedelta + +try: + from configparser import ConfigParser +except ImportError: + from ConfigParser import ConfigParser + + +from coilmq.store import QueueStore +from coilmq.config import config +from coilmq.exception import ConfigError +from coilmq.util.concurrency import synchronized + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + +lock = threading.RLock() + + +def make_dbm(): + """ + Creates a DBM queue store, pulling config values from the CoilMQ configuration. + """ + try: + data_dir = config.get('coilmq', 'qstore.dbm.data_dir') + cp_ops = config.getint('coilmq', 'qstore.dbm.checkpoint_operations') + cp_timeout = config.getint('coilmq', 'qstore.dbm.checkpoint_timeout') + except ConfigParser.NoOptionError as e: + raise ConfigError('Missing configuration parameter: %s' % e) + + if not os.path.exists(data_dir): + raise ConfigError('DBM directory does not exist: %s' % data_dir) + # FIXME: how do these get applied? Is OR appropriate? + if not os.access(data_dir, os.W_OK | os.R_OK): + raise ConfigError('Cannot read and write DBM directory: %s' % data_dir) + + store = DbmQueue(data_dir, checkpoint_operations=cp_ops, + checkpoint_timeout=cp_timeout) + return store + + +class DbmQueue(QueueStore): + """ + A QueueStore implementation that stores messages and queue information in DBM-style + database. + + Several database files will be used to support this functionality: metadata about the + queues will be stored in its own database and each queue will also have its own + database file. + + This classes uses a C{threading.RLock} to guard access to the memory store, since it + appears that at least some of the underlying implementations that anydbm uses are not + thread-safe + + Due to some impedence mismatch between the types of data we need to store in queues + (specifically lists) and the types of data that are best stored in DBM databases + (specifically dicts), this class uses the `shelve` module to abstract away some + of the ugliness. The consequence of this is that we only persist objects periodically + to the datastore, for performance reasons. How periodic is determined by the + `checkpoint_operations` and `checkpoint_timeout` instance variables (and params to + L{__init__}). + + @ivar data_dir: The directory where DBM files will be stored. + @type data_dir: C{str} + + @ivar queue_metadata: A Shelf (DBM) database that tracks stats & delivered message ids + for all the queues. + @ivar queue_metadata: C{shelve.Shelf} + + @ivar frame_store: A Shelf (DBM) database that contains frame contents indexed by message id. + @type frame_store: C{shelve.Shelf} + + @ivar _opcount: Internal counter for keeping track of unpersisted operations. + @type _opcount: C{int} + + @ivar checkpoint_operations: Number of operations between syncs. + @type checkpoint_operations: C{int} + + @ivar checkpoint_timeout: Max time (in seconds) that can elapse between sync of cache. + @type checkpoint_timeout: C{float} + """ + + def __init__(self, data_dir, checkpoint_operations=100, checkpoint_timeout=30): + """ + @param data_dir: The directory where DBM files will be stored. + @param data_dir: C{str} + + @param checkpoint_operations: Number of operations between syncs. + @type checkpoint_operations: C{int} + + @param checkpoint_timeout: Max time (in seconds) that can elapse between sync of cache. + @type checkpoint_timeout: C{float} + """ + QueueStore.__init__(self) + + self._opcount = 0 + self._last_sync = datetime.now() + + self.data_dir = data_dir + self.checkpoint_operations = checkpoint_operations + self.checkpoint_timeout = timedelta(seconds=checkpoint_timeout) + + # Should this be in constructor? + + # The queue metadata stores mutable (dict) objects. For this reason we set + # writeback=True and rely on the sync() method to keep the cache & disk + # in-sync. + self.queue_metadata = shelve.open(os.path.join( + self.data_dir, 'metadata'), writeback=True) + + # Since we do not need mutable objects on the frame stores (we don't modify them, we just + # put/get values), we do NOT use writeback=True here. This should also conserve on memory + # usage, since apparently that can get hefty with the caching when + # writeback=True. + self.frame_store = shelve.open(os.path.join( + self.data_dir, 'frames'), writeback=False) + + @synchronized(lock) + def enqueue(self, destination, frame): + """ + Store message (frame) for specified destinationination. + + @param destination: The destinationination queue name for this message (frame). + @type destination: C{str} + + @param frame: The message (frame) to send to specified destinationination. + @type frame: C{stompclient.frame.Frame} + """ + message_id = frame.headers.get('message-id') + if not message_id: + raise ValueError("Cannot queue a frame without message-id set.") + + if not destination in self.queue_metadata: + self.log.info( + "Destination %s not in metadata; creating new entry and queue database." % destination) + self.queue_metadata[destination] = { + 'frames': deque(), 'enqueued': 0, 'dequeued': 0, 'size': 0} + + self.queue_metadata[destination]['frames'].appendleft(message_id) + self.queue_metadata[destination]['enqueued'] += 1 + + self.frame_store[message_id] = frame + + self._opcount += 1 + self._sync() + + @synchronized(lock) + def dequeue(self, destination): + """ + Removes and returns an item from the queue (or C{None} if no items in queue). + + @param destination: The queue name (destinationination). + @type destination: C{str} + + @return: The first frame in the specified queue, or C{None} if there are none. + @rtype: C{stompclient.frame.Frame} + """ + if not self.has_frames(destination): + return None + + message_id = self.queue_metadata[destination]['frames'].pop() + self.queue_metadata[destination]['dequeued'] += 1 + + frame = self.frame_store[message_id] + del self.frame_store[message_id] + + self._opcount += 1 + self._sync() + + return frame + + @synchronized(lock) + def has_frames(self, destination): + """ + Whether specified queue has any frames. + + @param destination: The queue name (destinationination). + @type destination: C{str} + + @return: Whether there are any frames in the specified queue. + @rtype: C{bool} + """ + return (destination in self.queue_metadata) and bool(self.queue_metadata[destination]['frames']) + + @synchronized(lock) + def size(self, destination): + """ + Size of the queue for specified destination. + + @param destination: The queue destination (e.g. /queue/foo) + @type destination: C{str} + + @return: The number of frames in specified queue. + @rtype: C{int} + """ + if not destination in self.queue_metadata: + return 0 + else: + return len(self.queue_metadata[destination]['frames']) + + @synchronized(lock) + def close(self): + """ + Closes the databases, freeing any resources (and flushing any unsaved changes to disk). + """ + self.queue_metadata.close() + self.frame_store.close() + + @synchronized(lock) + def destinations(self): + """ + Provides a list of destinations (queue "addresses") available. + + @return: A list of the detinations available. + @rtype: C{set} + """ + return set(self.queue_metadata.keys()) + + def _sync(self): + """ + Synchronize the cached data with the underlyind database. + + Uses an internal transaction counter and compares to the checkpoint_operations + and checkpoint_timeout paramters to determine whether to persist the memory store. + + In this implementation, this method wraps calls to C{shelve.Shelf#sync}. + """ + if (self._opcount > self.checkpoint_operations or + datetime.now() > self._last_sync + self.checkpoint_timeout): + self.log.debug("Synchronizing queue metadata.") + self.queue_metadata.sync() + self._last_sync = datetime.now() + self._opcount = 0 + else: + self.log.debug("NOT synchronizing queue metadata.") http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/memory.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/store/memory.py b/ambari-common/src/test/python/coilmq/store/memory.py new file mode 100644 index 0000000..783967c --- /dev/null +++ b/ambari-common/src/test/python/coilmq/store/memory.py @@ -0,0 +1,76 @@ +""" +Queue storage module that uses thread-safe, in-memory data structures. +""" +import threading +from collections import defaultdict, deque + +from coilmq.store import QueueStore +from coilmq.util.concurrency import synchronized + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + +lock = threading.RLock() + + +class MemoryQueue(QueueStore): + """ + A QueueStore implementation that stores messages in memory. + + This classes uses a C{threading.RLock} to guard access to the memory store. + The locks on this class are probably excessive given that the + L{coilmq.queue.QueueManager} is already implementing coarse-grained locking + on the methods that access this storage backend. That said, we'll start + over-protective and refactor later it if proves unecessary. + """ + + def __init__(self): + QueueStore.__init__(self) + self._messages = defaultdict(deque) + + @synchronized(lock) + def enqueue(self, destination, frame): + self._messages[destination].appendleft(frame) + + @synchronized(lock) + def dequeue(self, destination): + try: + return self._messages[destination].pop() + except IndexError: + return None + + @synchronized(lock) + def size(self, destination): + """ + Size of the queue for specified destination. + + @param destination: The queue destination (e.g. /queue/foo) + @type destination: C{str} + """ + return len(self._messages[destination]) + + @synchronized(lock) + def has_frames(self, destination): + """ Whether this queue has frames for the specified destination. """ + return bool(self._messages[destination]) + + @synchronized(lock) + def destinations(self): + """ + Provides a list of destinations (queue "addresses") available. + + @return: A list of the detinations available. + @rtype: C{set} + """ + return set(self._messages.keys()) http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/rds.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/store/rds.py b/ambari-common/src/test/python/coilmq/store/rds.py new file mode 100644 index 0000000..f21bee3 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/store/rds.py @@ -0,0 +1,69 @@ +try: + import redis +except ImportError: # pragma: no cover + import sys; sys.exit('please, install redis-py package to use redis-store') +import threading +try: + import cPickle as pickle +except ImportError: + import pickle + +from coilmq.store import QueueStore +from coilmq.util.concurrency import synchronized +from coilmq.config import config + +__authors__ = ('"Hans Lellelid" <[email protected]>', '"Alexander Zhukov" <[email protected]>') +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + +lock = threading.RLock() + + +def make_redis_store(cfg=None): + return RedisQueueStore( + redis_conn=redis.Redis(**dict((cfg or config).items('redis')))) + + +class RedisQueueStore(QueueStore): + """Simple Queue with Redis Backend""" + def __init__(self, redis_conn=None): + """The default connection parameters are: host='localhost', port=6379, db=0""" + self.__db = redis_conn or redis.Redis() + # self.key = '{0}:{1}'.format(namespace, name) + super(RedisQueueStore, self).__init__() + + @synchronized(lock) + def enqueue(self, destination, frame): + self.__db.rpush(destination, pickle.dumps(frame)) + + @synchronized(lock) + def dequeue(self, destination): + item = self.__db.lpop(destination) + if item: + return pickle.loads(item) + + @synchronized(lock) + def requeue(self, destination, frame): + self.enqueue(destination, frame) + + @synchronized(lock) + def size(self, destination): + return self.__db.llen(destination) + + @synchronized(lock) + def has_frames(self, destination): + return self.size(destination) > 0 + + @synchronized(lock) + def destinations(self): + return self.__db.keys() http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/sa/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/store/sa/__init__.py b/ambari-common/src/test/python/coilmq/store/sa/__init__.py new file mode 100644 index 0000000..77c6ce9 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/store/sa/__init__.py @@ -0,0 +1,205 @@ +""" +Queue storage module that uses SQLAlchemy to access queue information and frames in a database. + + +""" +import threading +import logging +import os +import os.path +import shelve +from collections import deque +from datetime import datetime, timedelta + +# try: +# from configparser import ConfigParser +# except ImportError: +# from ConfigParser import ConfigParser + +from sqlalchemy import engine_from_config, MetaData +from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.sql import select, func, distinct + +from coilmq.store import QueueStore +from coilmq.config import config +from coilmq.exception import ConfigError +from coilmq.util.concurrency import synchronized +from coilmq.store.sa import meta, model + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + + +def make_sa(): + """ + Factory to creates a SQLAlchemy queue store, pulling config values from the CoilMQ configuration. + """ + configuration = dict(config.items('coilmq')) + engine = engine_from_config(configuration, 'qstore.sqlalchemy.') + init_model(engine) + store = SAQueue() + return store + + +def init_model(engine, create=True, drop=False): + """ + Initializes the shared SQLAlchemy state in the L{coilmq.store.sa.model} module. + + @param engine: The SQLAlchemy engine instance. + @type engine: C{sqlalchemy.Engine} + + @param create: Whether to create the tables (if they do not exist). + @type create: C{bool} + + @param drop: Whether to drop the tables (if they exist). + @type drop: C{bool} + """ + meta.engine = engine + meta.metadata = MetaData(bind=meta.engine) + meta.Session = scoped_session(sessionmaker(bind=meta.engine)) + model.setup_tables(create=create, drop=drop) + + +class SAQueue(QueueStore): + """ + A QueueStore implementation that stores messages in a database and uses SQLAlchemy to interface + with the database. + + Note that this implementation does not actually use the ORM capabilities of SQLAlchemy, but simply + uses SQLAlchemy for the DB abstraction for SQL building and DDL (table creation). + + This L{coilmq.store.sa.model.setup_tables} function is used to actually define (& create) the + database tables. This class also depends on the L{init_model} method have been called to + define the L{coilmq.store.sa.model.Session} class-like callable (and the engine & metadata). + + Finally, this class does not explicitly use shared data (db connections); a new Session is created + in each method. The actual implementation is handled using SQLAlchemy scoped sessions, which provide + thread-local Session class-like callables. As a result of deferring that to the SA layer, we don't + need to use synchronization locks to guard calls to the methods in this store implementation. + """ + + def enqueue(self, destination, frame): + """ + Store message (frame) for specified destinationination. + + @param destination: The destinationination queue name for this message (frame). + @type destination: C{str} + + @param frame: The message (frame) to send to specified destinationination. + @type frame: C{stompclient.frame.Frame} + """ + session = meta.Session() + message_id = frame.headers.get('message-id') + if not message_id: + raise ValueError("Cannot queue a frame without message-id set.") + ins = model.frames_table.insert().values( + message_id=message_id, destination=destination, frame=frame) + session.execute(ins) + session.commit() + + def dequeue(self, destination): + """ + Removes and returns an item from the queue (or C{None} if no items in queue). + + @param destination: The queue name (destinationination). + @type destination: C{str} + + @return: The first frame in the specified queue, or C{None} if there are none. + @rtype: C{stompclient.frame.Frame} + """ + session = meta.Session() + + try: + + selstmt = select( + [model.frames_table.c.message_id, model.frames_table.c.frame]) + selstmt = selstmt.where( + model.frames_table.c.destination == destination) + selstmt = selstmt.order_by( + model.frames_table.c.queued, model.frames_table.c.sequence) + + result = session.execute(selstmt) + + first = result.fetchone() + if not first: + return None + + delstmt = model.frames_table.delete().where(model.frames_table.c.message_id == + first[model.frames_table.c.message_id]) + session.execute(delstmt) + + frame = first[model.frames_table.c.frame] + + except: + session.rollback() + raise + else: + session.commit() + return frame + + def has_frames(self, destination): + """ + Whether specified queue has any frames. + + @param destination: The queue name (destinationination). + @type destination: C{str} + + @return: Whether there are any frames in the specified queue. + @rtype: C{bool} + """ + session = meta.Session() + sel = select([model.frames_table.c.message_id]).where( + model.frames_table.c.destination == destination) + result = session.execute(sel) + + first = result.fetchone() + return first is not None + + def size(self, destination): + """ + Size of the queue for specified destination. + + @param destination: The queue destination (e.g. /queue/foo) + @type destination: C{str} + + @return: The number of frames in specified queue. + @rtype: C{int} + """ + session = meta.Session() + sel = select([func.count(model.frames_table.c.message_id)]).where( + model.frames_table.c.destination == destination) + result = session.execute(sel) + first = result.fetchone() + if not first: + return 0 + else: + return int(first[0]) + + def destinations(self): + """ + Provides a list of destinations (queue "addresses") available. + + @return: A list of the detinations available. + @rtype: C{set} + """ + session = meta.Session() + sel = select([distinct(model.frames_table.c.destination)]) + result = session.execute(sel) + return set([r[0] for r in result.fetchall()]) + + def close(self): + """ + Closes the databases, freeing any resources (and flushing any unsaved changes to disk). + """ + meta.Session.remove() http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/sa/meta.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/store/sa/meta.py b/ambari-common/src/test/python/coilmq/store/sa/meta.py new file mode 100644 index 0000000..bf47ca9 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/store/sa/meta.py @@ -0,0 +1,9 @@ +""" +Module to hold shared SQLAlchemy state. + +These objects are set by the L{coilmq.store.sa.init_model} function. +""" +engine = None # : The SA engine +Session = None # : The SA Session (or Session-like callable) +# : The SA C{sqlalchemy.orm.MetaData} instance bound to the engine. +metadata = None http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/store/sa/model.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/store/sa/model.py b/ambari-common/src/test/python/coilmq/store/sa/model.py new file mode 100644 index 0000000..43e5009 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/store/sa/model.py @@ -0,0 +1,53 @@ +""" +Definition of the datamodel required for SA storage backend. +""" + +from sqlalchemy import Table, Column, BigInteger, String, PickleType, DateTime +from sqlalchemy.sql import func + +from coilmq.store.sa import meta + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + +frames_table = None # : The C{sqlalchemy.Table} set by L{setup_tables} + + +def setup_tables(create=True, drop=False): + """ + Binds the model classes to registered metadata and engine and (potentially) + creates the db tables. + + This function expects that you have bound the L{meta.metadata} and L{meta.engine}. + + @param create: Whether to create the tables (if they do not exist). + @type create: C{bool} + + @param drop: Whether to drop the tables (if they exist). + @type drop: C{bool} + """ + global frames_table + frames_table = Table('frames', meta.metadata, + Column('message_id', String(255), primary_key=True), + Column('sequence', BigInteger, + primary_key=False, autoincrement=True), + Column('destination', String(255), index=True), + Column('frame', PickleType), + Column('queued', DateTime, default=func.now())) + + if drop: + meta.metadata.drop_all() + + if drop or create: + meta.metadata.create_all() http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/topic.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/topic.py b/ambari-common/src/test/python/coilmq/topic.py new file mode 100644 index 0000000..d6436be --- /dev/null +++ b/ambari-common/src/test/python/coilmq/topic.py @@ -0,0 +1,144 @@ +""" +Non-durable topic support functionality. + +This code is inspired by the design of the Ruby stompserver project, by +Patrick Hurley and Lionel Bouton. See http://stompserver.rubyforge.org/ +""" +import logging +import threading +import uuid +from collections import defaultdict + +from coilmq.util.concurrency import synchronized + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + +lock = threading.RLock() + + +class TopicManager(object): + """ + Class that manages distribution of messages to topic subscribers. + + This class uses C{threading.RLock} to guard the public methods. This is probably + a bit excessive, given 1) the actomic nature of basic C{dict} read/write operations + and 2) the fact that most of the internal data structures are keying off of the + STOMP connection, which is going to be thread-isolated. That said, this seems like + the technically correct approach and should increase the chance of this code being + portable to non-GIL systems. + + @ivar _topics: A dict of registered topics, keyed by destination. + @type _topics: C{dict} of C{str} to C{set} of L{coilmq.server.StompConnection} + """ + + def __init__(self): + self.log = logging.getLogger( + '%s.%s' % (__name__, self.__class__.__name__)) + + # Lock var is required for L{synchornized} decorator. + self._lock = threading.RLock() + + self._topics = defaultdict(set) + + # TODO: If we want durable topics, we'll need a store for topics. + + @synchronized(lock) + def close(self): + """ + Closes all resources associated with this topic manager. + + (Currently this is simply here for API conformity w/ L{coilmq.queue.QueueManager}.) + """ + self.log.info("Shutting down topic manager.") # pragma: no cover + + @synchronized(lock) + def subscribe(self, connection, destination): + """ + Subscribes a connection to the specified topic destination. + + @param connection: The client connection to subscribe. + @type connection: L{coilmq.server.StompConnection} + + @param destination: The topic destination (e.g. '/topic/foo') + @type destination: C{str} + """ + self.log.debug("Subscribing %s to %s" % (connection, destination)) + self._topics[destination].add(connection) + + @synchronized(lock) + def unsubscribe(self, connection, destination): + """ + Unsubscribes a connection from the specified topic destination. + + @param connection: The client connection to unsubscribe. + @type connection: L{coilmq.server.StompConnection} + + @param destination: The topic destination (e.g. '/topic/foo') + @type destination: C{str} + """ + self.log.debug("Unsubscribing %s from %s" % (connection, destination)) + if connection in self._topics[destination]: + self._topics[destination].remove(connection) + + if not self._topics[destination]: + del self._topics[destination] + + @synchronized(lock) + def disconnect(self, connection): + """ + Removes a subscriber connection. + + @param connection: The client connection to unsubscribe. + @type connection: L{coilmq.server.StompConnection} + """ + self.log.debug("Disconnecting %s" % connection) + for dest in list(self._topics.keys()): + if connection in self._topics[dest]: + self._topics[dest].remove(connection) + if not self._topics[dest]: + # This won't trigger RuntimeError, since we're using keys() + del self._topics[dest] + + @synchronized(lock) + def send(self, message): + """ + Sends a message to all subscribers of destination. + + @param message: The message frame. (The frame will be modified to set command + to MESSAGE and set a message id.) + @type message: L{stompclient.frame.Frame} + """ + dest = message.headers.get('destination') + if not dest: + raise ValueError( + "Cannot send frame with no destination: %s" % message) + + message.cmd = 'message' + + message.headers.setdefault('message-id', str(uuid.uuid4())) + + bad_subscribers = set() + for subscriber in self._topics[dest]: + try: + subscriber.send_frame(message) + except: + self.log.exception( + "Error delivering message to subscriber %s; client will be disconnected." % subscriber) + # We queue for deletion so we are not modifying the topics dict + # while iterating over it. + bad_subscribers.add(subscriber) + + for subscriber in bad_subscribers: + self.disconnect(subscriber) http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/util/__init__.py b/ambari-common/src/test/python/coilmq/util/__init__.py new file mode 100644 index 0000000..ae25f74 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/util/__init__.py @@ -0,0 +1,16 @@ +""" +CoilMQ utility modules. +""" +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/concurrency.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/util/concurrency.py b/ambari-common/src/test/python/coilmq/util/concurrency.py new file mode 100644 index 0000000..3eb539a --- /dev/null +++ b/ambari-common/src/test/python/coilmq/util/concurrency.py @@ -0,0 +1,96 @@ +""" +Tools to facilitate developing thread-safe components. +""" + +import abc +import threading +import functools + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + + +def synchronized(lock): + def synchronize(func): + """ + Decorator to lock and unlock a method (Phillip J. Eby). + + This function is to be used with object instance methods; the object must + have a _lock variable (of type C{threading.Lock} or C{threading.RLock}). + + @param func: Method to decorate + @type func: C{callable} + """ + @functools.wraps(func) + def wrapper(*args, **kwargs): + with lock: + return func(*args, **kwargs) + return wrapper + return synchronize + + +class CoilTimerBase(object): + + __metaclass__ = abc.ABCMeta + + def __init__(self): + self.jobs = [] + + def schedule(self, period, callback): + self.jobs.append((period, callback)) + + @abc.abstractmethod + def run(self): + raise NotImplementedError + + @abc.abstractmethod + def start(self): + raise NotImplementedError + + @abc.abstractmethod + def stop(self): + raise NotImplementedError + + def __enter__(self): + self.start() + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + + +# TODO: check against following notes +# <http://stackoverflow.com/questions/2124540/how-does-timer-in-python-work-regarding-mutlithreading> +# <http://stackoverflow.com/questions/12435211/python-threading-timer-repeat-function-every-n-seconds> +class CoilThreadingTimer(CoilTimerBase): + + def __init__(self, *args, **kwargs): + super(CoilThreadingTimer, self).__init__(*args, **kwargs) + self._running = False + + def run(self): + def run_job(interval, callback): + if self._running: + threading.Timer(interval, run_job, args=(interval, callback)).start() + callback() + for period, job in self.jobs: + run_job(period, job) + + def start(self): + self._running = True + self.run() + + def stop(self): + self._running = False + + http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/frames.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/util/frames.py b/ambari-common/src/test/python/coilmq/util/frames.py new file mode 100644 index 0000000..3026ee0 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/util/frames.py @@ -0,0 +1,359 @@ +from functools import partial +import re +import logging +from collections import OrderedDict +import io + +import six + +SEND = 'SEND' +CONNECT = 'CONNECT' +MESSAGE = 'MESSAGE' +ERROR = 'ERROR' +CONNECTED = 'CONNECTED' +SUBSCRIBE = 'SUBSCRIBE' +UNSUBSCRIBE = 'UNSUBSCRIBE' +BEGIN = 'BEGIN' +COMMIT = 'COMMIT' +ABORT = 'ABORT' +ACK = 'ACK' +NACK = 'NACK' +DISCONNECT = 'DISCONNECT' + +VALID_COMMANDS = ['message', 'connect', 'connected', 'error', 'send', + 'subscribe', 'unsubscribe', 'begin', 'commit', 'abort', 'ack', 'disconnect', 'nack'] + +TEXT_PLAIN = 'text/plain' + + +class IncompleteFrame(Exception): + """The frame has incomplete body""" + + +class BodyNotTerminated(Exception): + """The frame's body is not terminated with the NULL character""" + + +class EmptyBuffer(Exception): + """The buffer is empty""" + + +def parse_headers(buff): + """ + Parses buffer and returns command and headers as strings + """ + preamble_lines = list(map( + lambda x: six.u(x).decode(), + iter(lambda: buff.readline().strip(), b'')) + ) + if not preamble_lines: + raise EmptyBuffer() + return preamble_lines[0], OrderedDict([l.split(':') for l in preamble_lines[1:]]) + + +def parse_body(buff, headers): + content_length = int(headers.get('content-length', -1)) + body = buff.read(content_length) + if content_length >= 0: + if len(body) < content_length: + raise IncompleteFrame() + terminator = six.u(buff.read(1)).decode() + if not terminator: + raise BodyNotTerminated() + else: + # no content length + body, terminator, rest = body.partition(b'\x00') + if not terminator: + raise BodyNotTerminated() + else: + buff.seek(-len(rest), 2) + + return body + + +class Frame(object): + """ + A STOMP frame (or message). + + :param cmd: the protocol command + :param headers: a map of headers for the frame + :param body: the content of the frame. + """ + + def __init__(self, cmd, headers=None, body=None): + self.cmd = cmd + self.headers = headers or {} + self.body = body or '' + + def __str__(self): + return '{{cmd={0},headers=[{1}],body={2}}}'.format( + self.cmd, + self.headers, + self.body if isinstance( + self.body, six.binary_type) else six.b(self.body) + ) + + def __eq__(self, other): + """ Override equality checking to test for matching command, headers, and body. """ + return all([isinstance(other, Frame), + self.cmd == other.cmd, + self.headers == other.headers, + self.body == other.body]) + + @property + def transaction(self): + return self.headers.get('transaction') + + @classmethod + def from_buffer(cls, buff): + cmd, headers = parse_headers(buff) + body = parse_body(buff, headers) + return cls(cmd, headers=headers, body=body) + + def pack(self): + """ + Create a string representation from object state. + + @return: The string (bytes) for this stomp frame. + @rtype: C{str} + """ + + self.headers.setdefault('content-length', len(self.body)) + + # Convert and append any existing headers to a string as the + # protocol describes. + headerparts = ("{0}:{1}\n".format(key, value) + for key, value in self.headers.items()) + + # Frame is Command + Header + EOF marker. + return six.b("{0}\n{1}\n".format(self.cmd, "".join(headerparts))) + (self.body if isinstance(self.body, six.binary_type) else six.b(self.body)) + six.b('\x00') + + +class ConnectedFrame(Frame): + """ A CONNECTED server frame (response to CONNECT). + + @ivar session: The (throw-away) session ID to include in response. + @type session: C{str} + """ + + def __init__(self, session, extra_headers=None): + """ + @param session: The (throw-away) session ID to include in response. + @type session: C{str} + """ + super(ConnectedFrame, self).__init__( + cmd='connected', headers=extra_headers or {}) + self.headers['session'] = session + + +class HeaderValue(object): + """ + An descriptor class that can be used when a calculated header value is needed. + + This class is a descriptor, implementing __get__ to return the calculated value. + While according to U{http://docs.codehaus.org/display/STOMP/Character+Encoding} there + seems to some general idea about having UTF-8 as the character encoding for headers; + however the C{stomper} lib does not support this currently. + + For example, to use this class to generate the content-length header: + + >>> body = 'asdf' + >>> headers = {} + >>> headers['content-length'] = HeaderValue(calculator=lambda: len(body)) + >>> str(headers['content-length']) + '4' + + @ivar calc: The calculator function. + @type calc: C{callable} + """ + + def __init__(self, calculator): + """ + @param calculator: The calculator callable that will yield the desired value. + @type calculator: C{callable} + """ + if not callable(calculator): + raise ValueError("Non-callable param: %s" % calculator) + self.calc = calculator + + def __get__(self, obj, objtype): + return self.calc() + + def __str__(self): + return str(self.calc()) + + def __set__(self, obj, value): + self.calc = value + + def __repr__(self): + return '<%s calculator=%s>' % (self.__class__.__name__, self.calc) + + +class ErrorFrame(Frame): + """ An ERROR server frame. """ + + def __init__(self, message, body=None, extra_headers=None): + """ + @param body: The message body bytes. + @type body: C{str} + """ + super(ErrorFrame, self).__init__(cmd='error', + headers=extra_headers or {}, body=body) + self.headers['message'] = message + self.headers[ + 'content-length'] = HeaderValue(calculator=lambda: len(self.body)) + + def __repr__(self): + return '<%s message=%r>' % (self.__class__.__name__, self.headers['message']) + + +class ReceiptFrame(Frame): + """ A RECEIPT server frame. """ + + def __init__(self, receipt, extra_headers=None): + """ + @param receipt: The receipt message ID. + @type receipt: C{str} + """ + super(ReceiptFrame, self).__init__( + 'RECEIPT', headers=extra_headers or {}) + self.headers['receipt-id'] = receipt + + +class FrameBuffer(object): + """ + A customized version of the StompBuffer class from Stomper project that returns frame objects + and supports iteration. + + This version of the parser also assumes that stomp messages with no content-lengh + end in a simple \\x00 char, not \\x00\\n as is assumed by + C{stomper.stompbuffer.StompBuffer}. Additionally, this class differs from Stomper version + by conforming to PEP-8 coding style. + + This class can be used to smooth over a transport that may provide partial frames (or + may provide multiple frames in one data buffer). + + @ivar _buffer: The internal byte buffer. + @type _buffer: C{str} + + @ivar debug: Log extra parsing debug (logs will be DEBUG level). + @type debug: C{bool} + """ + + # regexp to check that the buffer starts with a command. + command_re = re.compile('^(.+?)\n') + + # regexp to remove everything up to and including the first + # instance of '\x00' (used in resynching the buffer). + sync_re = re.compile('^.*?\x00') + + # regexp to determine the content length. The buffer should always start + # with a command followed by the headers, so the content-length header will + # always be preceded by a newline. It may not always proceeded by a + # newline, though! + content_length_re = re.compile('\ncontent-length\s*:\s*(\d+)\s*(\n|$)') + + def __init__(self): + self._buffer = io.BytesIO() + self._pointer = 0 + self.debug = False + self.log = logging.getLogger('%s.%s' % ( + self.__module__, self.__class__.__name__)) + + def clear(self): + """ + Clears (empties) the internal buffer. + """ + self._buffer = io + + def buffer_len(self): + """ + @return: Number of bytes in the internal buffer. + @rtype: C{int} + """ + return len(self._buffer) + + def buffer_empty(self): + """ + @return: C{True} if buffer is empty, C{False} otherwise. + @rtype: C{bool} + """ + return not bool(self._buffer) + + def append(self, data): + """ + Appends bytes to the internal buffer (may or may not contain full stomp frames). + + @param data: The bytes to append. + @type data: C{str} + """ + self._buffer.write(data) + + def extract_frame(self): + """ + Pulls one complete frame off the buffer and returns it. + + If there is no complete message in the buffer, returns None. + + Note that the buffer can contain more than once message. You + should therefore call this method in a loop (or use iterator + functionality exposed by class) until None returned. + + @return: The next complete frame in the buffer. + @rtype: L{stomp.frame.Frame} + """ + # (mbytes, hbytes) = self._find_message_bytes(self.buffer) + # if not mbytes: + # return None + # + # msgdata = self.buffer[:mbytes] + # self.buffer = self.buffer[mbytes:] + # hdata = msgdata[:hbytes] + # # Strip off any leading whitespace from headers; this is necessary, because + # # we do not (any longer) expect a trailing \n after the \x00 byte (which means + # # it will become a leading \n to the next frame). + # hdata = hdata.lstrip() + # elems = hdata.split('\n') + # cmd = elems.pop(0) + # headers = {} + # + # for e in elems: + # try: + # (k,v) = e.split(':', 1) # header values may contain ':' so specify maxsplit + # except ValueError: + # continue + # headers[k.strip()] = v.strip() + # + # # hbytes points to the start of the '\n\n' at the end of the header, + # # so 2 bytes beyond this is the start of the body. The body EXCLUDES + # # the final byte, which is '\x00'. + # body = msgdata[hbytes + 2:-1] + self._buffer.seek(self._pointer, 0) + try: + f = Frame.from_buffer(self._buffer) + self._pointer = self._buffer.tell() + except (IncompleteFrame, EmptyBuffer): + self._buffer.seek(self._pointer, 0) + return None + + return f + + def __iter__(self): + """ + Returns an iterator object. + """ + return self + + def __next__(self): + """ + Return the next STOMP message in the buffer (supporting iteration). + + @rtype: L{stomp.frame.Frame} + """ + msg = self.extract_frame() + if not msg: + raise StopIteration() + return msg + + def next(self): + return self.__next__() http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/util/six.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/util/six.py b/ambari-common/src/test/python/coilmq/util/six.py new file mode 100644 index 0000000..0a381fd --- /dev/null +++ b/ambari-common/src/test/python/coilmq/util/six.py @@ -0,0 +1,16 @@ +import sys + +PY3 = sys.version_info[0] == 3 + +if PY3: + def b(s): + return s.encode("latin-1") + def u(s): + return s + binary_type = bytes +else: + def b(s): + return s + def u(s): + return unicode(s.replace(r'\\', r'\\\\'), "unicode_escape") + binary_type = str \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6f9bfdd..f192895 100644 --- a/pom.xml +++ b/pom.xml @@ -297,6 +297,8 @@ <!--Python Mock library (BSD license)--> <exclude>ambari-common/src/test/python/mock/**</exclude> + <!--Coilmq Mock library (Apache license)--> + <exclude>ambari-common/src/test/python/coilmq/**</exclude> <!--Jinja2 library (BSD license)--> <exclude>ambari-common/src/main/python/ambari_jinja2/**</exclude> <exclude>ambari-common/src/main/python/jinja2/**</exclude>
