AMBARI-20645. Integrate coilmq stomp server as a mock server for ambari-agent unittests. (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c7612bcf Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c7612bcf Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c7612bcf Branch: refs/heads/branch-3.0-perf Commit: c7612bcf9b3e91e75416b484ea03bfc2169e1c29 Parents: b4e15b7 Author: Andrew Onishuk <[email protected]> Authored: Sat Apr 1 10:07:38 2017 +0300 Committer: Andrew Onishuk <[email protected]> Committed: Sat Apr 1 10:07:38 2017 +0300 ---------------------------------------------------------------------- NOTICE.txt | 10 +- .../ambari_agent/BaseStompServerTestCase.py | 225 +++++++++++ .../ambari_agent/TestAgentStompResponses.py | 35 ++ .../src/test/python/coilmq/__init__.py | 16 + .../src/test/python/coilmq/auth/__init__.py | 34 ++ .../src/test/python/coilmq/auth/simple.py | 101 +++++ .../src/test/python/coilmq/config/__init__.py | 152 ++++++++ .../test/python/coilmq/config/coilmq.cfg-sample | 71 ++++ .../src/test/python/coilmq/config/defaults.cfg | 41 ++ ambari-common/src/test/python/coilmq/engine.py | 94 +++++ .../src/test/python/coilmq/exception.py | 43 +++ .../src/test/python/coilmq/protocol/__init__.py | 342 +++++++++++++++++ ambari-common/src/test/python/coilmq/queue.py | 376 +++++++++++++++++++ .../src/test/python/coilmq/scheduler.py | 141 +++++++ .../src/test/python/coilmq/server/__init__.py | 44 +++ .../test/python/coilmq/server/socket_server.py | 192 ++++++++++ ambari-common/src/test/python/coilmq/start.py | 226 +++++++++++ .../src/test/python/coilmq/store/__init__.py | 189 ++++++++++ .../src/test/python/coilmq/store/dbm.py | 262 +++++++++++++ .../src/test/python/coilmq/store/memory.py | 76 ++++ .../src/test/python/coilmq/store/rds.py | 69 ++++ .../src/test/python/coilmq/store/sa/__init__.py | 205 ++++++++++ .../src/test/python/coilmq/store/sa/meta.py | 9 + .../src/test/python/coilmq/store/sa/model.py | 53 +++ ambari-common/src/test/python/coilmq/topic.py | 144 +++++++ .../src/test/python/coilmq/util/__init__.py | 16 + .../src/test/python/coilmq/util/concurrency.py | 96 +++++ .../src/test/python/coilmq/util/frames.py | 359 ++++++++++++++++++ .../src/test/python/coilmq/util/six.py | 16 + pom.xml | 2 + 30 files changed, 3636 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/NOTICE.txt ---------------------------------------------------------------------- diff --git a/NOTICE.txt b/NOTICE.txt index 3b65239..50f982c 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -4,11 +4,16 @@ Copyright 2011-2013 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). -Component ambari-common/src/test/python are under the following copyright: +Component ambari-common/src/test/python/mock are under the following copyright: Copyright (c) 2003-2012, Michael Foord All rights reserved. +Component ambari-common/src/test/python/coilmq are under the following copyright: + +Copyright 2009 Hans Lellelid +All rights reserved. + Resource management library derived from Kokki, which is under following copyright: @@ -23,5 +28,4 @@ Some rights reserved. This product includes Simplejson, library fast encoding and decoding of json. (https://github.com/simplejson/simplejson - MIT license) Copyright (c) 2006 Bob Ippolito. -All rights reserved. - +All rights reserved. \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py new file mode 100644 index 0000000..8198217 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import sys +import time +import unittest +import logging +import socket +import select +import threading + +try: + from queue import Queue, Empty +except ImportError: + from Queue import Queue, Empty + +from coilmq.util.frames import Frame, FrameBuffer +from coilmq.queue import QueueManager +from coilmq.topic import TopicManager +from coilmq.util import frames +from coilmq.server.socket_server import StompServer, StompRequestHandler, ThreadedStompServer +from coilmq.store.memory import MemoryQueue +from coilmq.scheduler import FavorReliableSubscriberScheduler, RandomQueueScheduler +from coilmq.protocol import STOMP10 + +class BaseStompServerTestCase(unittest.TestCase): + """ + Base class for test cases provides the fixtures for setting up the multi-threaded + unit test infrastructure. + We use a combination of C{threading.Event} and C{Queue.Queue} objects to faciliate + inter-thread communication and lock-stepping the assertions. + """ + + def setUp(self): + + self.clients = [] + self.server = None # This gets set in the server thread. + self.server_address = None # This gets set in the server thread. + self.ready_event = threading.Event() + + addr_bound = threading.Event() + + def start_server(): + self.server = TestStompServer(('127.0.0.1', 21613), + ready_event=self.ready_event, + authenticator=None, + queue_manager=self._queuemanager(), + topic_manager=self._topicmanager()) + self.server_address = self.server.socket.getsockname() + addr_bound.set() + self.server.serve_forever() + + self.server_thread = threading.Thread( + target=start_server, name='server') + self.server_thread.start() + self.ready_event.wait() + addr_bound.wait() + + def _queuemanager(self): + """ + Returns the configured L{QueueManager} instance to use. + Can be overridden by subclasses that wish to change out any queue mgr parameters. + @rtype: L{QueueManager} + """ + return QueueManager(store=MemoryQueue(), + subscriber_scheduler=FavorReliableSubscriberScheduler(), + queue_scheduler=RandomQueueScheduler()) + + def _topicmanager(self): + """ + Returns the configured L{TopicManager} instance to use. + Can be overridden by subclasses that wish to change out any topic mgr parameters. + @rtype: L{TopicManager} + """ + return TopicManager() + + def tearDown(self): + for c in self.clients: + c.close() + import time; print time.time() + self.server.shutdown() # server_close takes too much time + import time; print time.time() + self.server_thread.join() + self.ready_event.clear() + del self.server_thread + + def _new_client(self, connect=True): + """ + Get a new L{TestStompClient} connected to our test server. + The client will also be registered for close in the tearDown method. + @param connect: Whether to issue the CONNECT command. + @type connect: C{bool} + @rtype: L{TestStompClient} + """ + client = TestStompClient(self.server_address) + self.clients.append(client) + if connect: + client.connect() + res = client.received_frames.get(timeout=1) + self.assertEqual(res.cmd, frames.CONNECTED) + return client + + +class TestStompServer(ThreadedStompServer): + """ + A stomp server for functional tests that uses C{threading.Event} objects + to ensure that it stays in sync with the test suite. + """ + + allow_reuse_address = True + + def __init__(self, server_address, + ready_event=None, + authenticator=None, + queue_manager=None, + topic_manager=None): + self.ready_event = ready_event + StompServer.__init__(self, server_address, StompRequestHandler, + authenticator=authenticator, + queue_manager=queue_manager, + topic_manager=topic_manager, + protocol=STOMP10) + + def server_activate(self): + self.ready_event.set() + StompServer.server_activate(self) + + +class TestStompClient(object): + """ + A stomp client for use in testing. + This client spawns a listener thread and pushes anything that comes in onto the + read_frames queue. + @ivar received_frames: A queue of Frame instances that have been received. + @type received_frames: C{Queue.Queue} containing any received C{stompclient.frame.Frame} + """ + + def __init__(self, addr, connect=True): + """ + @param addr: The (host,port) tuple for connection. + @type addr: C{tuple} + @param connect: Whether to connect socket to specified addr. + @type connect: C{bool} + """ + self.log = logging.getLogger('%s.%s' % ( + self.__module__, self.__class__.__name__)) + self.sock = None + self.addr = addr + self.received_frames = Queue() + self.read_stopped = threading.Event() + self.buffer = FrameBuffer() + if connect: + self._connect() + + def connect(self, headers=None): + self.send_frame(Frame(frames.CONNECT, headers=headers)) + + def send(self, destination, message, set_content_length=True, extra_headers=None): + headers = extra_headers or {} + headers['destination'] = destination + if set_content_length: + headers['content-length'] = len(message) + self.send_frame(Frame('send', headers=headers, body=message)) + + def subscribe(self, destination): + self.send_frame(Frame('subscribe', headers={ + 'destination': destination})) + + def send_frame(self, frame): + """ + Sends a stomp frame. + @param frame: The stomp frame to send. + @type frame: L{stompclient.frame.Frame} + """ + if not self.connected: + raise RuntimeError("Not connected") + self.sock.send(frame.pack()) + + def _connect(self): + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect(self.addr) + self.connected = True + self.read_stopped.clear() + t = threading.Thread(target=self._read_loop, + name="client-receiver-%s" % hex(id(self))) + t.start() + + def _read_loop(self): + while self.connected: + r, w, e = select.select([self.sock], [], [], 0.1) + if r: + data = self.sock.recv(1024) + self.buffer.append(data) + for frame in self.buffer: + self.log.debug("Processing frame: %s" % frame) + self.received_frames.put(frame) + self.read_stopped.set() + # print "Read loop has been quit! for %s" % id(self) + + def disconnect(self): + self.send_frame(Frame('disconnect')) + + def close(self): + if not self.connected: + raise RuntimeError("Not connected") + self.connected = False + self.read_stopped.wait(timeout=0.5) + self.sock.close() \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py new file mode 100644 index 0000000..4c61bf6 --- /dev/null +++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' +from coilmq.util import frames +from coilmq.util.frames import Frame + +from BaseStompServerTestCase import BaseStompServerTestCase + +class TestAgentStompResponses(BaseStompServerTestCase): + def test_mock_server_can_start(self): + c1 = self._new_client() + c1.connect() + + c1.subscribe('/topics/test') + + f = Frame(frames.MESSAGE, headers={'destination': '/topics/test'}, body='test-body') + self.server.topic_manager.send(f) + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/__init__.py b/ambari-common/src/test/python/coilmq/__init__.py new file mode 100644 index 0000000..c7010e0 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/__init__.py @@ -0,0 +1,16 @@ +""" +Top-level CoilMQ package. +""" +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/auth/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/auth/__init__.py b/ambari-common/src/test/python/coilmq/auth/__init__.py new file mode 100644 index 0000000..5ce3b61 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/auth/__init__.py @@ -0,0 +1,34 @@ +""" +Authentication providers. + +Because authentication providers are instantiated and configured in the application scope +(and not in the request handler), the authenticator implementations must be thread-safe. +""" +import abc +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + + +class Authenticator(object): + """ Abstract base class for authenticators. """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def authenticate(self, login, passcode): + """ + Authenticate the login and passcode. + + @return: Whether user is authenticated. + @rtype: C{bool} + """ http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/auth/simple.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/auth/simple.py b/ambari-common/src/test/python/coilmq/auth/simple.py new file mode 100644 index 0000000..cb092bb --- /dev/null +++ b/ambari-common/src/test/python/coilmq/auth/simple.py @@ -0,0 +1,101 @@ +""" +A simple config-file based authentication module. +""" +try: + from configparser import ConfigParser +except ImportError: + from ConfigParser import ConfigParser + ConfigParser.read_file = ConfigParser.readfp + +from coilmq.auth import Authenticator +from coilmq.config import config +from coilmq.exception import ConfigError + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + + +def make_simple(): + """ + Create a L{SimpleAuthenticator} instance using values read from coilmq configuration. + + @return: The configured L{SimpleAuthenticator} + @rtype: L{SimpleAuthenticator} + @raise ConfigError: If there is a configuration error. + """ + authfile = config.get('coilmq', 'auth.simple.file') + if not authfile: + raise ConfigError('Missing configuration parameter: auth.simple.file') + sa = SimpleAuthenticator() + sa.from_configfile(authfile) + return sa + + +class SimpleAuthenticator(Authenticator): + """ + A simple configfile-based authenticator. + + @ivar store: Authentication key-value store (of logins to passwords). + @type store: C{dict} of C{str} to C{str} + """ + + def __init__(self, store=None): + """ + Initialize the authenticator to use (optionally) specified C{dict} store. + + @param store: Authentication store, C{dict} of logins to passwords. + @type store: C{dict} of C{str} to C{str} + """ + if store is None: + store = {} + self.store = store + + def from_configfile(self, configfile): + """ + Initialize the authentication store from a "config"-style file. + + Auth "config" file is parsed with C{ConfigParser.RawConfigParser} and must contain + an [auth] section which contains the usernames (keys) and passwords (values). + + Example auth file:: + + [auth] + someuser = somepass + anotheruser = anotherpass + + @param configfile: Path to config file or file-like object. + @type configfile: C{any} + @raise ValueError: If file could not be read or does not contain [auth] section. + """ + cfg = ConfigParser() + if hasattr(configfile, 'read'): + cfg.read_file(configfile) + else: + filesread = cfg.read(configfile) + if not filesread: + raise ValueError('Could not parse auth file: %s' % configfile) + + if not cfg.has_section('auth'): + raise ValueError('Config file contains no [auth] section.') + + self.store = dict(cfg.items('auth')) + + def authenticate(self, login, passcode): + """ + Authenticate the login and passcode. + + @return: Whether provided login and password match values in store. + @rtype: C{bool} + """ + return login in self.store and self.store[login] == passcode http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/config/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/config/__init__.py b/ambari-common/src/test/python/coilmq/config/__init__.py new file mode 100644 index 0000000..4428cd4 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/config/__init__.py @@ -0,0 +1,152 @@ +""" +Configuration support functionality. + +The global C{config} object (C{ConfigParser.SafeConfigParser} instance) is initialized +with default configuration from the defaults.cfg file, which is located in this package. +In order to ensure that the config contains custom values, you must call the C{init_config} +function during application initialization: + +from coilmq.config import config, init_config +init_config('/path/to/config.cfg') + +config.getint('listen_port') +""" +import os.path +import logging +import logging.config +import warnings +import io + +try: + from configparser import ConfigParser +except ImportError: + from ConfigParser import ConfigParser + + +from pkg_resources import resource_filename, resource_stream +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + +config = ConfigParser() +config.read(os.path.join(os.path.dirname(__file__), 'defaults.cfg')) + + +def init_config(config_file=None): + """ + Initialize the configuration from a config file. + + The values in config_file will override those already loaded from the default + configuration file (defaults.cfg, in current package). + + This method does not setup logging. + + @param config_file: The path to a configuration file. + @type config_file: C{str} + + @raise ValueError: if the specified config_file could not be read. + @see: L{init_logging} + """ + global config + + if config_file and os.path.exists(config_file): + read = config.read([config_file]) + if not read: + raise ValueError( + "Could not read configuration from file: %s" % config_file) + + +def init_logging(logfile=None, loglevel=logging.INFO, configfile=None): + """ + Configures the logging using either basic filename + loglevel or passed config file path. + + This is performed separately from L{init_config()} in order to support the case where + logging should happen independent of (usu. *after*) other aspects of the configuration + initialization. For example, if logging may need to be initialized within a daemon + context. + + @param logfile: An explicitly specified logfile destination. If this is specified in addition + to default logging, a warning will be issued. + @type logfile: C{str} + + @param loglevel: Which level to use when logging to explicitly specified file or stdout. + @type loglevel: C{int} + + @param configfile: The path to a configuration file. This takes precedence over any explicitly + specified logfile/loglevel (but a warning will be logged if both are specified). + If the file is not specified or does not exist annd no logfile was specified, + then the default.cfg configuration file will be used to initialize logging. + @type configfile: C{str} + """ + # If a config file was specified, we will use that in place of the + # explicitly + use_configfile = False + if configfile and os.path.exists(configfile): + testcfg = ConfigParser() + read = testcfg.read(configfile) + use_configfile = (read and testcfg.has_section('loggers')) + + if use_configfile: + logging.config.fileConfig(configfile) + if logfile: + msg = "Config file conflicts with explicitly specified logfile; config file takes precedence." + logging.warn(msg) + else: + format = '%(asctime)s [%(threadName)s] %(name)s - %(levelname)s - %(message)s' + if logfile: + logging.basicConfig( + filename=logfile, level=loglevel, format=format) + else: + logging.basicConfig(level=loglevel, format=format) + + +def resolve_name(name): + """ + Resolve a dotted name to some object (usually class, module, or function). + + Supported naming formats include: + 1. path.to.module:method + 2. path.to.module.ClassName + + >>> resolve_name('coilmq.store.memory.MemoryQueue') + <class 'coilmq.store.memory.MemoryQueue'> + >>> t = resolve_name('coilmq.store.dbm.make_dbm') + >>> import inspect + >>> inspect.isfunction(t) + True + >>> t.__name__ + 'make_dbm' + + @param name: The dotted name (e.g. path.to.MyClass) + @type name: C{str} + + @return: The resolved object (class, callable, etc.) or None if not found. + """ + if ':' in name: + # Normalize foo.bar.baz:main to foo.bar.baz.main + # (since our logic below will handle that) + name = '%s.%s' % tuple(name.split(':')) + + name = name.split('.') + + used = name.pop(0) + found = __import__(used) + for n in name: + used = used + '.' + n + try: + found = getattr(found, n) + except AttributeError: + __import__(used) + found = getattr(found, n) + + return found http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/config/coilmq.cfg-sample ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/config/coilmq.cfg-sample b/ambari-common/src/test/python/coilmq/config/coilmq.cfg-sample new file mode 100644 index 0000000..165f1f6 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/config/coilmq.cfg-sample @@ -0,0 +1,71 @@ +[coilmq] + +; Backend implementation configuration. +; ------------------------------------- +; Factories for implementations can be configured by passing a dotted-path. +; This is typically the path to a class or to a callable that returns a configured +; object (that implements the necessary functionality). The reason to use a callable +; is in cases when the object may need to be initialized with values from the +; app's configuration. (For example, a database storage engine will need to know +; the database connect URI.) + +; Configuration backend used for storage +; qstore.factory = coilmq.store.memory.MemoryQueue + +; Configure the scheduler implementations used +; scheduler.subscriber_priority_factory = coilmq.scheduler.FavorReliableSubscriberScheduler +; scheduler.queue_priority_factory = coilmq.scheduler.RandomQueueScheduler + +; Configure the authenticator class -- and any options +; auth.factory = coilmq.auth.simple:make_simple +; auth.simple.file = /path/to/authfile.ini + +; You can include custom logging configuration, if you like. If you do configure +; logging here, then any commandline logging arguments (e.g. --logfile) will be _IGNORED_. +; See the Python documentation for the log file format: +; http://docs.python.org/library/logging.html#configuration-file-format + +[loggers] +keys=root,coilmq + +[handlers] +keys=console,file,syslog + +[formatters] +keys=threaded,syslog + +[logger_root] +level=DEBUG +handlers=console,file,syslog + +[logger_coilmq] +level=DEBUG +handlers=console,file,syslog +qualname=coilmq +propagate=0 + +[handler_console] +class=StreamHandler +level=DEBUG +formatter=threaded +args=(sys.stdout,) + +[handler_syslog] +class=handlers.SysLogHandler +level=DEBUG +formatter=syslog +args=('/dev/log',) + +[handler_file] +class=FileHandler +level=DEBUG +formatter=threaded +args=('/tmp/coilmq.log', 'w') + +[formatter_syslog] +format=[%(threadName)s] %(name)s - %(levelname)s - %(message)s +datefmt= + +[formatter_threaded] +format=%(asctime)s [%(threadName)s] %(name)s - %(levelname)s - %(message)s +datefmt= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/config/defaults.cfg ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/config/defaults.cfg b/ambari-common/src/test/python/coilmq/config/defaults.cfg new file mode 100644 index 0000000..3e8c27c --- /dev/null +++ b/ambari-common/src/test/python/coilmq/config/defaults.cfg @@ -0,0 +1,41 @@ +; --------------------------------------------------------------------------------------- +; Default configuration values for the coilmq broker. +; +; Do not modify this file; instead, copy over the coilmq.cfg-sample file in this +; directory and customize that to your needs. Those values will override any values +; here. +; --------------------------------------------------------------------------------------- + +[coilmq] + +listen_addr = 127.0.0.1 +listen_port = 61613 + +; Backend implementation configuration. +; ------------------------------------- +; Factories for implementations can be configured by passing a dotted-path. +; This is typically the path to a class or to a callable that returns a configured +; object (that implements the necessary functionality). The reason to use a callable +; is in cases when the object may need to be initialized with values from the +; app's configuration. (For example, a database storage engine will need to know +; the database connect URI.) + +; Configuration backend used for storage +qstore.factory = coilmq.store.memory.MemoryQueue + +; Configure some defaults for the DBM store +qstore.dbm.checkpoint_operations = 100 +qstore.dbm.checkpoint_timeout = 20 + +; Configure the scheduler implementations used +scheduler.subscriber_priority_factory = coilmq.scheduler.FavorReliableSubscriberScheduler +scheduler.queue_priority_factory = coilmq.scheduler.RandomQueueScheduler + +; Authentication configuration +; auth.factory = coilmq.auth.simple.make_simple +; auth.simple.file = coilmq/tests/resources/auth.ini + +; When running in debug mode how frequently we should print out stats/diagnostic information +; about queue sizes, memory, etc. +; (Set to 0 too disable the diagnostic information.) +debug.stats_poll_interval = 10.0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/engine.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/engine.py b/ambari-common/src/test/python/coilmq/engine.py new file mode 100644 index 0000000..42352a3 --- /dev/null +++ b/ambari-common/src/test/python/coilmq/engine.py @@ -0,0 +1,94 @@ +from __future__ import absolute_import + +from coilmq.protocol import STOMP10 + +""" +Core STOMP server logic, abstracted from socket transport implementation. + +While this is abstracted from the socket transport implementation, it does +operate on the assumption that there is an available connection to send response +frames. + +We're also making some simplified assumptions here which may make this engine +impractical for [high-performance] use in specifically asynchronous frameworks. +More specifically, this class was not explicitly designed around async patterns, +meaning that it would likely be problematic to use with a framework like Twisted +if the underlying storage implementations were processor intensive (e.g. database +access). For the default memory storage engines, this shouldn't be a problem. + +This code is inspired by the design of the Ruby stompserver project, by +Patrick Hurley and Lionel Bouton. See http://stompserver.rubyforge.org/ +""" +import logging +from collections import defaultdict + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + + +class StompEngine(object): + """ + The engine provides the core business logic that we use to respond to STOMP protocol + messages. + + This class is transport-agnostic; it exposes methods that expect STOMP frames and + uses the attached connection to send frames to connected clients. + + @ivar connection: The connection (aka "protocol") backing this engine. + @type connection: L{coilmq.server.StompConnection} + + @ivar authenticator: An C{Authenticator} implementation to use. Setting this value + will implicitly cause authentication to be required. + @type authenticator: L{coilmq.auth.Authenticator} + + @ivar queue_manager: The C{QueueManager} implementation to use. + @type queue_manager: L{coilmq.queue.QueueManager} + + @ivar topic_manager: The C{TopicManager} implementation to use. + @type topic_manager: L{coilmq.topic.TopicManager} + + @ivar transactions: Active transactions for this connection. + @type transactions: C{dict} of C{str} to C{list} + + @ivar connected: Whether engine is connected. + @type connected: C{bool} + """ + + def __init__(self, connection, authenticator, queue_manager, topic_manager, protocol=STOMP10): + """ + @param connection: The stomp connection backing this engine. + @type connection: L{coilmq.server.StompConnection} + """ + self.log = logging.getLogger('%s.%s' % ( + self.__class__.__module__, self.__class__.__name__)) + self.connection = connection + self.authenticator = authenticator + self.queue_manager = queue_manager + self.topic_manager = topic_manager + self.connected = False + self.transactions = defaultdict(list) + + self.protocol = protocol(self) + + def process_frame(self, frame): + self.protocol.process_frame(frame) + + def unbind(self): + """ + Unbinds this connection from queue and topic managers (freeing up resources) + and resets state. + """ + self.connected = False + self.queue_manager.disconnect(self.connection) + self.topic_manager.disconnect(self.connection) http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/exception.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/exception.py b/ambari-common/src/test/python/coilmq/exception.py new file mode 100644 index 0000000..75f476c --- /dev/null +++ b/ambari-common/src/test/python/coilmq/exception.py @@ -0,0 +1,43 @@ +""" +Exception classes used by CoilMQ. + +CoilMQ exceptions extend C{RuntimeError} or other appropriate sub-classes. These will be +thrown if there is not a more appropriate error class already provided by builtins. +""" +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + + +class ProtocolError(RuntimeError): + """ + Represents an error at the STOMP protocol layer. + """ + + +class ConfigError(RuntimeError): + """ + Represents an error in the configuration of the application. + """ + + +class AuthError(RuntimeError): + """ + Represents an authentication or authorization error. + """ + + +class ClientDisconnected(Exception): + """ + A signal that client has disconnected (so we shouldn't try to keep reading from the client). + """ http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/protocol/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/protocol/__init__.py b/ambari-common/src/test/python/coilmq/protocol/__init__.py new file mode 100644 index 0000000..da955ce --- /dev/null +++ b/ambari-common/src/test/python/coilmq/protocol/__init__.py @@ -0,0 +1,342 @@ +import abc +import uuid +import socket +import datetime + +from coilmq.exception import ProtocolError, AuthError +from coilmq.util import frames +from coilmq.util.frames import Frame, ErrorFrame, ReceiptFrame, ConnectedFrame +from coilmq.util.concurrency import CoilThreadingTimer + +SEND = 'SEND' +CONNECT = 'CONNECT' +MESSAGE = 'MESSAGE' +ERROR = 'ERROR' +CONNECTED = 'CONNECTED' +SUBSCRIBE = 'SUBSCRIBE' +UNSUBSCRIBE = 'UNSUBSCRIBE' +BEGIN = 'BEGIN' +COMMIT = 'COMMIT' +ABORT = 'ABORT' +ACK = 'ACK' +DISCONNECT = 'DISCONNECT' + +VALID_COMMANDS = ['message', 'connect', 'connected', 'error', 'send', + 'subscribe', 'unsubscribe', 'begin', 'commit', 'abort', 'ack', 'disconnect', 'nack', 'stomp'] + + +class STOMP(object): + + __metaclass__ = abc.ABCMeta + + def __init__(self, engine): + self.engine = engine + + def stomp(self, frame): + self.connect(frame) + + @abc.abstractmethod + def process_frame(self, frame): + raise NotImplementedError + + @abc.abstractmethod + def connect(self, frame): + raise NotImplementedError + + @abc.abstractmethod + def send(self, frame): + raise NotImplementedError + + @abc.abstractmethod + def subscribe(self, frame): + raise NotImplementedError + + @abc.abstractmethod + def unsubscribe(self, frame): + raise NotImplementedError + + @abc.abstractmethod + def begin(self, frame): + raise NotImplementedError + + @abc.abstractmethod + def commit(self, frame): + raise NotImplementedError + + @abc.abstractmethod + def abort(self, frame): + raise NotImplementedError + + @abc.abstractmethod + def ack(self, frame): + raise NotImplementedError + + @abc.abstractmethod + def disconnect(self, frame): + raise NotImplementedError + + +class STOMP10(STOMP): + + def process_frame(self, frame): + """ + Dispatches a received frame to the appropriate internal method. + + @param frame: The frame that was received. + @type frame: C{stompclient.frame.Frame} + """ + cmd_method = frame.cmd.lower() + + if not cmd_method in VALID_COMMANDS: + raise ProtocolError("Invalid STOMP command: {}".format(frame.cmd)) + + method = getattr(self, cmd_method, None) + + if not self.engine.connected and method not in (self.connect, self.stomp): + raise ProtocolError("Not connected.") + + try: + transaction = frame.headers.get('transaction') + if not transaction or method in (self.begin, self.commit, self.abort): + method(frame) + else: + if not transaction in self.engine.transactions: + raise ProtocolError( + "Invalid transaction specified: %s" % transaction) + self.engine.transactions[transaction].append(frame) + except Exception as e: + self.engine.log.error("Error processing STOMP frame: %s" % e) + self.engine.log.exception(e) + try: + self.engine.connection.send_frame(ErrorFrame(str(e), str(e))) + except Exception as e: # pragma: no cover + self.engine.log.error("Could not send error frame: %s" % e) + self.engine.log.exception(e) + else: + # The protocol is not especially clear here (not sure why I'm surprised) + # about the expected behavior WRT receipts and errors. We will assume that + # the RECEIPT frame should not be sent if there is an error frame. + # Also we'll assume that a transaction should not preclude sending the receipt + # frame. + # import pdb; pdb.set_trace() + if frame.headers.get('receipt') and method != self.connect: + self.engine.connection.send_frame(ReceiptFrame( + receipt=frame.headers.get('receipt'))) + + def connect(self, frame, response=None): + """ + Handle CONNECT command: Establishes a new connection and checks auth (if applicable). + """ + self.engine.log.debug("CONNECT") + + if self.engine.authenticator: + login = frame.headers.get('login') + passcode = frame.headers.get('passcode') + if not self.engine.authenticator.authenticate(login, passcode): + raise AuthError("Authentication failed for %s" % login) + + self.engine.connected = True + + response = response or Frame(frames.CONNECTED) + response.headers['session'] = uuid.uuid4() + + # TODO: Do we want to do anything special to track sessions? + # (Actually, I don't think the spec actually does anything with this at all.) + self.engine.connection.send_frame(response) + + def send(self, frame): + """ + Handle the SEND command: Delivers a message to a queue or topic (default). + """ + dest = frame.headers.get('destination') + if not dest: + raise ProtocolError('Missing destination for SEND command.') + + if dest.startswith('/queue/'): + self.engine.queue_manager.send(frame) + else: + self.engine.topic_manager.send(frame) + + def subscribe(self, frame): + """ + Handle the SUBSCRIBE command: Adds this connection to destination. + """ + ack = frame.headers.get('ack') + reliable = ack and ack.lower() == 'client' + + self.engine.connection.reliable_subscriber = reliable + + dest = frame.headers.get('destination') + if not dest: + raise ProtocolError('Missing destination for SUBSCRIBE command.') + + if dest.startswith('/queue/'): + self.engine.queue_manager.subscribe(self.engine.connection, dest) + else: + self.engine.topic_manager.subscribe(self.engine.connection, dest) + + def unsubscribe(self, frame): + """ + Handle the UNSUBSCRIBE command: Removes this connection from destination. + """ + dest = frame.headers.get('destination') + if not dest: + raise ProtocolError('Missing destination for UNSUBSCRIBE command.') + + if dest.startswith('/queue/'): + self.engine.queue_manager.unsubscribe(self.engine.connection, dest) + else: + self.engine.topic_manager.unsubscribe(self.engine.connection, dest) + + def begin(self, frame): + """ + Handles BEGING command: Starts a new transaction. + """ + if not frame.transaction: + raise ProtocolError("Missing transaction for BEGIN command.") + + self.engine.transactions[frame.transaction] = [] + + def commit(self, frame): + """ + Handles COMMIT command: Commits specified transaction. + """ + if not frame.transaction: + raise ProtocolError("Missing transaction for COMMIT command.") + + if not frame.transaction in self.engine.transactions: + raise ProtocolError("Invalid transaction: %s" % frame.transaction) + + for tframe in self.engine.transactions[frame.transaction]: + del tframe.headers['transaction'] + self.process_frame(tframe) + + self.engine.queue_manager.clear_transaction_frames( + self.engine.connection, frame.transaction) + del self.engine.transactions[frame.transaction] + + def abort(self, frame): + """ + Handles ABORT command: Rolls back specified transaction. + """ + if not frame.transaction: + raise ProtocolError("Missing transaction for ABORT command.") + + if not frame.transaction in self.engine.transactions: + raise ProtocolError("Invalid transaction: %s" % frame.transaction) + + self.engine.queue_manager.resend_transaction_frames( + self.engine.connection, frame.transaction) + del self.engine.transactions[frame.transaction] + + def ack(self, frame): + """ + Handles the ACK command: Acknowledges receipt of a message. + """ + if not frame.message_id: + raise ProtocolError("No message-id specified for ACK command.") + self.engine.queue_manager.ack(self.engine.connection, frame) + + def disconnect(self, frame): + """ + Handles the DISCONNECT command: Unbinds the connection. + + Clients are supposed to send this command, but in practice it should not be + relied upon. + """ + self.engine.log.debug("Disconnect") + self.engine.unbind() + + +class STOMP11(STOMP10): + + SUPPORTED_VERSIONS = {'1.0', '1.1'} + + def __init__(self, engine, send_heartbeat_interval=100, receive_heartbeat_interval=100, *args, **kwargs): + super(STOMP11, self).__init__(engine) + self.last_hb = datetime.datetime.now() + self.last_hb_sent = datetime.datetime.now() + self.timer = CoilThreadingTimer() + + # flags to control heartbeating + self.send_hb = self.receive_hb = False + + self.send_heartbeat_interval = datetime.timedelta(milliseconds=send_heartbeat_interval) + self.receive_heartbeat_interval = datetime.timedelta(milliseconds=receive_heartbeat_interval) + + def enable_heartbeat(self, cx, cy, response): + if self.send_heartbeat_interval and cy: + self.send_heartbeat_interval = max(self.send_heartbeat_interval, datetime.timedelta(milliseconds=cy)) + self.timer.schedule(max(self.send_heartbeat_interval, datetime.timedelta(milliseconds=cy)).total_seconds(), self.send_heartbeat) + if self.receive_heartbeat_interval and cx: + self.timer.schedule(max(self.send_heartbeat_interval, datetime.timedelta(milliseconds=cx)).total_seconds(), + self.receive_heartbeat) + self.timer.start() + response.headers['heart-beat'] = '{0},{1}'.format(int(self.send_heartbeat_interval.microseconds / 1000), + int(self.receive_heartbeat_interval.microseconds / 1000)) + + def disable_heartbeat(self): + self.timer.stop() + + def send_heartbeat(self): + # screw it, just send an error frame + self.engine.connection.send_frame(ErrorFrame('heartbeat')) + + def receive_heartbeat(self): + ago = datetime.datetime.now() - self.last_hb + if ago > self.receive_heartbeat_interval: + self.engine.log.debug("No heartbeat was received for {0} seconds".format(ago.total_seconds())) + self.engine.unbind() + + def connect(self, frame, response=None): + connected_frame = Frame(frames.CONNECTED) + self._negotiate_protocol(frame, connected_frame) + heart_beat = frame.headers.get('heart-beat', '0,0') + if heart_beat: + self.enable_heartbeat(*map(int, heart_beat.split(',')), response=connected_frame) + super(STOMP11, self).connect(frame, response=connected_frame) + + def nack(self, frame): + """ + Handles the NACK command: Unacknowledges receipt of a message. + For now, this is just a placeholder to implement this version of the protocol + """ + if not frame.headers.get('message-id'): + raise ProtocolError("No message-id specified for NACK command.") + if not frame.headers.get('subscription'): + raise ProtocolError("No subscription specified for NACK command.") + + def _negotiate_protocol(self, frame, response): + client_versions = frame.headers.get('accept-version') + if not client_versions: + raise ProtocolError('No version specified') + common = set(client_versions.split(',')) & self.SUPPORTED_VERSIONS + if not common: + versions = ','.join(self.SUPPORTED_VERSIONS) + self.engine.connection.send_frame(Frame( + frames.ERROR, + headers={'version': versions, 'content-type': frames.TEXT_PLAIN}, + body='Supported protocol versions are {0}'.format(versions) + )) + else: + response.headers['version'] = max(common) + protocol_class = PROTOCOL_MAP[response.headers['version']] + if type(self) is not protocol_class: + self.engine.protocol = protocol_class(self.engine) + self.engine.protocol.connect(frame, response=response) + + +class STOMP12(STOMP11): + + SUPPORTED_VERSIONS = STOMP11.SUPPORTED_VERSIONS.union({'1.2', }) + + def connect(self, frame, response=None): + host = frame.headers.get('host') + if not host: + raise ProtocolError('"host" header is required') + if host != socket.getfqdn(): + raise ProtocolError('Virtual hosting is not supported or host is unknown') + super(STOMP12, self).connect(frame, response) + + +PROTOCOL_MAP = {'1.0': STOMP10, '1.1': STOMP11, '1.2': STOMP12} http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/queue.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/queue.py b/ambari-common/src/test/python/coilmq/queue.py new file mode 100644 index 0000000..0a1160b --- /dev/null +++ b/ambari-common/src/test/python/coilmq/queue.py @@ -0,0 +1,376 @@ +""" +Queue manager, queue implementation, and supporting classes. + +This code is inspired by the design of the Ruby stompserver project, by +Patrick Hurley and Lionel Bouton. See http://stompserver.rubyforge.org/ +""" +import logging +import threading +import uuid +from collections import defaultdict + +from coilmq.scheduler import FavorReliableSubscriberScheduler, RandomQueueScheduler +from coilmq.util.concurrency import synchronized + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + +lock = threading.RLock() + + +class QueueManager(object): + """ + Class that manages distribution of messages to queue subscribers. + + This class uses C{threading.RLock} to guard the public methods. This is probably + a bit excessive, given 1) the actomic nature of basic C{dict} read/write operations + and 2) the fact that most of the internal data structures are keying off of the + STOMP connection, which is going to be thread-isolated. That said, this seems like + the technically correct approach and should increase the chance of this code being + portable to non-GIL systems. + + @ivar store: The queue storage backend to use. + @type store: L{coilmq.store.QueueStore} + + @ivar subscriber_scheduler: The scheduler that chooses which subscriber to send + messages to. + @type subscriber_scheduler: L{coilmq.scheduler.SubscriberPriorityScheduler} + + @ivar queue_scheduler: The scheduler that chooses which queue to select for sending + backlogs for a single connection. + @type queue_scheduler: L{coilmq.scheduler.QueuePriorityScheduler} + + @ivar _queues: A dict of registered queues, keyed by destination. + @type _queues: C{dict} of C{str} to C{set} of L{coilmq.server.StompConnection} + + @ivar _pending: All messages waiting for ACK from clients. + @type _pending: C{dict} of L{coilmq.server.StompConnection} to C{stompclient.frame.Frame} + + @ivar _transaction_frames: Frames that have been ACK'd within a transaction. + @type _transaction_frames: C{dict} of L{coilmq.server.StompConnection} to C{dict} of C{str} to C{stompclient.frame.Frame} + """ + + def __init__(self, store, subscriber_scheduler=None, queue_scheduler=None): + """ + @param store: The queue storage backend. + @type store: L{coilmq.store.QueueStore} + + @param subscriber_scheduler: The scheduler that chooses which subscriber to send + messages to. + @type subscriber_scheduler: L{coilmq.scheduler.SubscriberPriorityScheduler} + + @param queue_scheduler: The scheduler that chooses which queue to select for sending + backlogs for a single connection. + @type queue_scheduler: L{coilmq.scheduler.QueuePriorityScheduler} + """ + self.log = logging.getLogger( + '%s.%s' % (__name__, self.__class__.__name__)) + + # Use default schedulers, if they're not specified + if subscriber_scheduler is None: + subscriber_scheduler = FavorReliableSubscriberScheduler() + + if queue_scheduler is None: + queue_scheduler = RandomQueueScheduler() + + # This lock var is required by L{synchronized} decorator. + self._lock = threading.RLock() + + self.store = store + self.subscriber_scheduler = subscriber_scheduler + self.queue_scheduler = queue_scheduler + + self._queues = defaultdict(set) + self._transaction_frames = defaultdict(lambda: defaultdict(list)) + self._pending = {} + + @synchronized(lock) + def close(self): + """ + Closes all resources/backends associated with this queue manager. + """ + self.log.info("Shutting down queue manager.") + if hasattr(self.store, 'close'): + self.store.close() + + if hasattr(self.subscriber_scheduler, 'close'): + self.subscriber_scheduler.close() + + if hasattr(self.queue_scheduler, 'close'): + self.queue_scheduler.close() + + @synchronized(lock) + def subscriber_count(self, destination=None): + """ + Returns a count of the number of subscribers. + + If destination is specified then it only returns count of subscribers + for that specific destination. + + @param destination: The optional topic/queue destination (e.g. '/queue/foo') + @type destination: C{str} + """ + if destination: + return len(self._queues[destination]) + else: + # total them up + total = 0 + for k in self._queues.keys(): + total += len(self._queues[k]) + return total + + @synchronized(lock) + def subscribe(self, connection, destination): + """ + Subscribes a connection to the specified destination (topic or queue). + + @param connection: The connection to subscribe. + @type connection: L{coilmq.server.StompConnection} + + @param destination: The topic/queue destination (e.g. '/queue/foo') + @type destination: C{str} + """ + self.log.debug("Subscribing %s to %s" % (connection, destination)) + self._queues[destination].add(connection) + self._send_backlog(connection, destination) + + @synchronized(lock) + def unsubscribe(self, connection, destination): + """ + Unsubscribes a connection from a destination (topic or queue). + + @param connection: The client connection to unsubscribe. + @type connection: L{coilmq.server.StompConnection} + + @param destination: The topic/queue destination (e.g. '/queue/foo') + @type destination: C{str} + """ + self.log.debug("Unsubscribing %s from %s" % (connection, destination)) + if connection in self._queues[destination]: + self._queues[destination].remove(connection) + + if not self._queues[destination]: + del self._queues[destination] + + @synchronized(lock) + def disconnect(self, connection): + """ + Removes a subscriber connection, ensuring that any pending commands get requeued. + + @param connection: The client connection to unsubscribe. + @type connection: L{coilmq.server.StompConnection} + """ + self.log.debug("Disconnecting %s" % connection) + if connection in self._pending: + pending_frame = self._pending[connection] + self.store.requeue(pending_frame.headers.get( + 'destination'), pending_frame) + del self._pending[connection] + + for dest in list(self._queues.keys()): + if connection in self._queues[dest]: + self._queues[dest].remove(connection) + if not self._queues[dest]: + # This won't trigger RuntimeError, since we're using keys() + del self._queues[dest] + + @synchronized(lock) + def send(self, message): + """ + Sends a MESSAGE frame to an eligible subscriber connection. + + Note that this method will modify the incoming message object to + add a message-id header (if not present) and to change the command + to 'MESSAGE' (if it is not). + + @param message: The message frame. + @type message: C{stompclient.frame.Frame} + """ + dest = message.headers.get('destination') + if not dest: + raise ValueError( + "Cannot send frame with no destination: %s" % message) + + message.cmd = 'message' + + message.headers.setdefault('message-id', str(uuid.uuid4())) + + # Grab all subscribers for this destination that do not have pending + # frames + subscribers = [s for s in self._queues[dest] + if s not in self._pending] + + if not subscribers: + self.log.debug( + "No eligible subscribers; adding message %s to queue %s" % (message, dest)) + self.store.enqueue(dest, message) + else: + selected = self.subscriber_scheduler.choice(subscribers, message) + self.log.debug("Delivering message %s to subscriber %s" % + (message, selected)) + self._send_frame(selected, message) + + @synchronized(lock) + def ack(self, connection, frame, transaction=None): + """ + Acknowledge receipt of a message. + + If the `transaction` parameter is non-null, the frame being ack'd + will be queued so that it can be requeued if the transaction + is rolled back. + + @param connection: The connection that is acknowledging the frame. + @type connection: L{coilmq.server.StompConnection} + + @param frame: The frame being acknowledged. + + """ + self.log.debug("ACK %s for %s" % (frame, connection)) + + if connection in self._pending: + pending_frame = self._pending[connection] + # Make sure that the frame being acknowledged matches + # the expected frame + if pending_frame.headers.get('message-id') != frame.headers.get('message-id'): + self.log.warning( + "Got a ACK for unexpected message-id: %s" % frame.message_id) + self.store.requeue(pending_frame.destination, pending_frame) + # (The pending frame will be removed further down) + + if transaction is not None: + self._transaction_frames[connection][ + transaction].append(pending_frame) + + del self._pending[connection] + self._send_backlog(connection) + + else: + self.log.debug("No pending messages for %s" % connection) + + @synchronized(lock) + def resend_transaction_frames(self, connection, transaction): + """ + Resend the messages that were ACK'd in specified transaction. + + This is called by the engine when there is an abort command. + + @param connection: The client connection that aborted the transaction. + @type connection: L{coilmq.server.StompConnection} + + @param transaction: The transaction id (which was aborted). + @type transaction: C{str} + """ + for frame in self._transaction_frames[connection][transaction]: + self.send(frame) + + @synchronized(lock) + def clear_transaction_frames(self, connection, transaction): + """ + Clears out the queued ACK frames for specified transaction. + + This is called by the engine when there is a commit command. + + @param connection: The client connection that committed the transaction. + @type connection: L{coilmq.server.StompConnection} + + @param transaction: The transaction id (which was committed). + @type transaction: C{str} + """ + try: + del self._transaction_frames[connection][transaction] + except KeyError: + # There may not have been any ACK frames for this transaction. + pass + + def _send_backlog(self, connection, destination=None): + """ + Sends any queued-up messages for the (optionally) specified destination to connection. + + If the destination is not provided, a destination is chosen using the + L{QueueManager.queue_scheduler} scheduler algorithm. + + (This method assumes it is being called from within a lock-guarded public + method.) + + @param connection: The client connection. + @type connection: L{coilmq.server.StompConnection} + + @param destination: The topic/queue destination (e.g. '/queue/foo') + @type destination: C{str} + + @raise Exception: if the underlying connection object raises an error, the message + will be re-queued and the error will be re-raised. + """ + if destination is None: + # Find all destinations that have frames and that contain this + # connection (subscriber). + eligible_queues = dict([(dest, q) for (dest, q) in self._queues.items() + if connection in q and self.store.has_frames(dest)]) + destination = self.queue_scheduler.choice( + eligible_queues, connection) + if destination is None: + self.log.debug( + "No eligible queues (with frames) for subscriber %s" % connection) + return + + self.log.debug("Sending backlog to %s for destination %s" % + (connection, destination)) + if connection.reliable_subscriber: + # only send one message (waiting for ack) + frame = self.store.dequeue(destination) + if frame: + try: + self._send_frame(connection, frame) + except Exception as x: + self.log.error( + "Error sending message %s (requeueing): %s" % (frame, x)) + self.store.requeue(destination, frame) + raise + else: + for frame in self.store.frames(destination): + try: + self._send_frame(connection, frame) + except Exception as x: + self.log.error( + "Error sending message %s (requeueing): %s" % (frame, x)) + self.store.requeue(destination, frame) + raise + + def _send_frame(self, connection, frame): + """ + Sends a frame to a specific subscriber connection. + + (This method assumes it is being called from within a lock-guarded public + method.) + + @param connection: The subscriber connection object to send to. + @type connection: L{coilmq.server.StompConnection} + + @param frame: The frame to send. + @type frame: L{stompclient.frame.Frame} + """ + assert connection is not None + assert frame is not None + + self.log.debug("Delivering frame %s to connection %s" % + (frame, connection)) + + if connection.reliable_subscriber: + if connection in self._pending: + raise RuntimeError("Connection already has a pending frame.") + self.log.debug( + "Tracking frame %s as pending for connection %s" % (frame, connection)) + self._pending[connection] = frame + + connection.send_frame(frame) http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/scheduler.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/scheduler.py b/ambari-common/src/test/python/coilmq/scheduler.py new file mode 100644 index 0000000..4ac7c3e --- /dev/null +++ b/ambari-common/src/test/python/coilmq/scheduler.py @@ -0,0 +1,141 @@ +""" +Classes that provide delivery scheduler implementations. + +The default implementation used by the system for determining which subscriber +(connection) should receive a message is simply a random choice but favoring +reliable subscribers. Developers can write their own delivery schedulers, which +should implement the methods defined in L{QueuePriorityScheduler} if they would +like to customize the behavior. +""" +import abc +import random + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + + +class SubscriberPriorityScheduler(object): + """ Abstract base class for choosing which recipient (subscriber) should receive a message. """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def choice(self, subscribers, message): + """ + Chooses which subscriber (from list) should recieve specified message. + + @param subscribers: Collection of subscribed connections eligible to receive message. + @type subscribers: C{list} of L{coilmq.server.StompConnection} + + @param message: The message to be delivered. + @type message: L{stompclient.frame.Frame} + + @return: A selected subscriber from the list or None if no subscriber could be chosen (e.g. list is empty). + @rtype: L{coilmq.server.StompConnection} + """ + + +class QueuePriorityScheduler(object): + """ + Abstract base class for objects that provide a way to prioritize the queues. + """ + + def choice(self, queues, connection): + """ + Choose which queue to select for messages to specified connection. + + @param queues: A C{dict} mapping queue name to queues (sets of frames) to which + specified connection is subscribed. + @type queues: C{dict} of C{str} to C{set} of L{stompclient.frame.Frame} + + @param connection: The connection that is going to be delivered the frame(s). + @type connection: L{coilmq.server.StompConnection} + + @return: A selected queue destination (name) or None if queues C{dict} is empty. + @rtype: C{str} + """ + raise NotImplementedError + + +class RandomSubscriberScheduler(SubscriberPriorityScheduler): + """ A delivery scheduler that chooses a random subscriber for message recipient. """ + + def choice(self, subscribers, message): + """ + Chooses a random connection from subscribers to deliver specified message. + + @param subscribers: Collection of subscribed connections to destination. + @type subscribers: C{list} of L{coilmq.server.StompConnection} + + @param message: The message to be delivered. + @type message: L{stompclient.frame.Frame} + + @return: A random subscriber from the list or None if list is empty. + @rtype: L{coilmq.server.StompConnection} + """ + if not subscribers: + return None + return random.choice(subscribers) + + +class FavorReliableSubscriberScheduler(SubscriberPriorityScheduler): + """ + A random delivery scheduler which prefers reliable subscribers. + """ + + def choice(self, subscribers, message): + """ + Choose a random connection, favoring those that are reliable from + subscriber pool to deliver specified message. + + @param subscribers: Collection of subscribed connections to destination. + @type subscribers: C{list} of L{coilmq.server.StompConnection} + + @param message: The message to be delivered. + @type message: L{stompclient.frame.Frame} + + @return: A random subscriber from the list or None if list is empty. + @rtype: L{coilmq.server.StompConnection} + """ + if not subscribers: + return None + reliable_subscribers = [ + s for s in subscribers if s.reliable_subscriber] + if reliable_subscribers: + return random.choice(reliable_subscribers) + else: + return random.choice(subscribers) + + +class RandomQueueScheduler(QueuePriorityScheduler): + """ + Implementation of L{QueuePriorityScheduler} that selects a random queue from the list. + """ + + def choice(self, queues, connection): + """ + Chooses a random queue for messages to specified connection. + + @param queues: A C{dict} mapping queue name to queues (sets of frames) to which + specified connection is subscribed. + @type queues: C{dict} of C{str} to C{set} of L{stompclient.frame.Frame} + + @param connection: The connection that is going to be delivered the frame(s). + @type connection: L{coilmq.server.StompConnection} + + @return: A random queue destination or None if list is empty. + @rtype: C{str} + """ + if not queues: + return None + return random.choice(list(queues.keys())) http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/server/__init__.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/server/__init__.py b/ambari-common/src/test/python/coilmq/server/__init__.py new file mode 100644 index 0000000..1a7c52a --- /dev/null +++ b/ambari-common/src/test/python/coilmq/server/__init__.py @@ -0,0 +1,44 @@ +""" +Package of available server implementations and shared functionality/interfaces. + +CoilMQ is designed for the Python StompServer reference socket server (specifically +multi-threaded); however, some alternative implementation examples are also provided. +""" +import abc + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + + +class StompConnection(object): + """ + An "interface" for server implementation classes to "implement". + + This class serves primarily as a means to document the API that CoilMQ will expect + the connection object to implement. + + @ivar reliable_subscriber: Whether this client will ACK all messages. + @type reliable_subscriber: C{bool} + """ + __metaclass__ = abc.ABCMeta + + reliable_subscriber = False + + @abc.abstractmethod + def send_frame(self, frame): + """ + Uses this connection implementation to send the specified frame to a connected client. + + @param frame: The STOMP frame to send. + @type frame: C{stompclient.frame.Frame} + """ http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/server/socket_server.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/test/python/coilmq/server/socket_server.py b/ambari-common/src/test/python/coilmq/server/socket_server.py new file mode 100644 index 0000000..b4f081e --- /dev/null +++ b/ambari-common/src/test/python/coilmq/server/socket_server.py @@ -0,0 +1,192 @@ +""" +The default/recommended SocketServer-based server implementation. +""" +import logging +import socket +import threading +try: + from socketserver import BaseRequestHandler, TCPServer, ThreadingMixIn +except ImportError: + from SocketServer import BaseRequestHandler, TCPServer, ThreadingMixIn + + +from coilmq.util.frames import FrameBuffer +from coilmq.server import StompConnection +from coilmq.engine import StompEngine +from coilmq.exception import ClientDisconnected + +__authors__ = ['"Hans Lellelid" <[email protected]>'] +__copyright__ = "Copyright 2009 Hans Lellelid" +__license__ = """Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License.""" + + +class StompRequestHandler(BaseRequestHandler, StompConnection): + """ + Class that will be instantiated to handle STOMP connections. + + This class will be instantiated once per connection to the server. In a multi-threaded + context, that means that instances of this class are scoped to a single thread. It should + be noted that while the L{coilmq.engine.StompEngine} instance will be thread-local, the + storage containers configured into the engine are not thread-local (and hence must be + thread-safe). + + @ivar buffer: A StompBuffer instance which buffers received data (to ensure we deal with + complete STOMP messages. + @type buffer: C{stompclient.util.FrameBuffer} + + @ivar engine: The STOMP protocol engine. + @type engine: L{coilmq.engine.StompEngine} + + @ivar debug: Whether to enable extra-verbose debug logging. (Will be logged at debug level.) + @type debug: C{bool} + """ + + def setup(self): + if self.server.timeout is not None: + self.request.settimeout(self.server.timeout) + self.debug = False + self.log = logging.getLogger('%s.%s' % (self.__module__, self.__class__.__name__)) + self.buffer = FrameBuffer() + self.engine = StompEngine(connection=self, + authenticator=self.server.authenticator, + queue_manager=self.server.queue_manager, + topic_manager=self.server.topic_manager, + protocol=self.server.protocol) + + def handle(self): + """ + Handle a new socket connection. + """ + # self.request is the TCP socket connected to the client + try: + while not self.server._shutdown_request_event.is_set(): + try: + data = self.request.recv(8192) + if not data: + break + if self.debug: + self.log.debug("RECV: %r" % data) + self.buffer.append(data) + + for frame in self.buffer: + self.log.debug("Processing frame: %s" % frame) + self.engine.process_frame(frame) + if not self.engine.connected: + raise ClientDisconnected() + except socket.timeout: # pragma: no cover + pass + except ClientDisconnected: + self.log.debug("Client disconnected, discontinuing read loop.") + except Exception as e: # pragma: no cover + self.log.error("Error receiving data (unbinding): %s" % e) + self.engine.unbind() + raise + + def finish(self): + """ + Normal (non-error) termination of request. + + Unbinds the engine. + @see: L{coilmq.engine.StompEngine.unbind} + """ + self.engine.unbind() + + def send_frame(self, frame): + """ Sends a frame to connected socket client. + + @param frame: The frame to send. + @type frame: C{stompclient.frame.Frame} + """ + packed = frame.pack() + if self.debug: # pragma: no cover + self.log.debug("SEND: %r" % packed) + self.request.sendall(packed) + + +class StompServer(TCPServer): + """ + Subclass of C{StompServer.TCPServer} to handle new connections with + instances of L{StompRequestHandler}. + + @ivar authenticator: The authenticator to use. + @type authenticator: L{coilmq.auth.Authenticator} + + @ivar queue_manager: The queue manager to use. + @type queue_manager: L{coilmq.queue.QueueManager} + + @ivar topic_manager: The topic manager to use. + @type topic_manager: L{coilmq.topic.TopicManager} + """ + + # This causes the SO_REUSEADDR option to be set on the socket, allowing + # server to rebind to the same address (w/o waiting for connections to + # leave TIME_WAIT after unclean disconnect). + allow_reuse_address = True + + def __init__(self, server_address, RequestHandlerClass=None, timeout=3.0, + authenticator=None, queue_manager=None, topic_manager=None, protocol=None): + """ + Extension to C{TCPServer} constructor to provide mechanism for providing implementation classes. + + @param server_address: The (address,port) C{tuple} + @param RequestHandlerClass: The class to use for handling requests. + @param timeout: The timeout for the underlying socket. + @keyword authenticator: The configure L{coilmq.auth.Authenticator} object to use. + @keyword queue_manager: The configured L{coilmq.queue.QueueManager} object to use. + @keyword topic_manager: The configured L{coilmq.topic.TopicManager} object to use. + """ + self.log = logging.getLogger('%s.%s' % ( + self.__module__, self.__class__.__name__)) + if not RequestHandlerClass: + RequestHandlerClass = StompRequestHandler + self.timeout = timeout + self.authenticator = authenticator + self.queue_manager = queue_manager + self.topic_manager = topic_manager + self.protocol = protocol + self._serving_event = threading.Event() + self._shutdown_request_event = threading.Event() + TCPServer.__init__(self, server_address, RequestHandlerClass) + + def server_close(self): + """ + Closes the socket server and any associated resources. + """ + self.log.debug("Closing the socket server connection.") + TCPServer.server_close(self) + self.queue_manager.close() + self.topic_manager.close() + if hasattr(self.authenticator, 'close'): + self.authenticator.close() + self.shutdown() + + def shutdown(self): + if self._serving_event.is_set(): + self._shutdown_request_event.set() + self._serving_event.clear() + TCPServer.shutdown(self) + + def serve_forever(self, poll_interval=0.5): + """Handle one request at a time until shutdown. + + Polls for shutdown every poll_interval seconds. Ignores + self.timeout. If you need to do periodic tasks, do them in + another thread. + """ + self._serving_event.set() + self._shutdown_request_event.clear() + TCPServer.serve_forever(self, poll_interval=poll_interval) + + +class ThreadedStompServer(ThreadingMixIn, StompServer): + pass
