AMBARI-20645. Integrate coilmq stomp server as a mock server for ambari-agent 
unittests.  (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c7612bcf
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c7612bcf
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c7612bcf

Branch: refs/heads/branch-3.0-perf
Commit: c7612bcf9b3e91e75416b484ea03bfc2169e1c29
Parents: b4e15b7
Author: Andrew Onishuk <[email protected]>
Authored: Sat Apr 1 10:07:38 2017 +0300
Committer: Andrew Onishuk <[email protected]>
Committed: Sat Apr 1 10:07:38 2017 +0300

----------------------------------------------------------------------
 NOTICE.txt                                      |  10 +-
 .../ambari_agent/BaseStompServerTestCase.py     | 225 +++++++++++
 .../ambari_agent/TestAgentStompResponses.py     |  35 ++
 .../src/test/python/coilmq/__init__.py          |  16 +
 .../src/test/python/coilmq/auth/__init__.py     |  34 ++
 .../src/test/python/coilmq/auth/simple.py       | 101 +++++
 .../src/test/python/coilmq/config/__init__.py   | 152 ++++++++
 .../test/python/coilmq/config/coilmq.cfg-sample |  71 ++++
 .../src/test/python/coilmq/config/defaults.cfg  |  41 ++
 ambari-common/src/test/python/coilmq/engine.py  |  94 +++++
 .../src/test/python/coilmq/exception.py         |  43 +++
 .../src/test/python/coilmq/protocol/__init__.py | 342 +++++++++++++++++
 ambari-common/src/test/python/coilmq/queue.py   | 376 +++++++++++++++++++
 .../src/test/python/coilmq/scheduler.py         | 141 +++++++
 .../src/test/python/coilmq/server/__init__.py   |  44 +++
 .../test/python/coilmq/server/socket_server.py  | 192 ++++++++++
 ambari-common/src/test/python/coilmq/start.py   | 226 +++++++++++
 .../src/test/python/coilmq/store/__init__.py    | 189 ++++++++++
 .../src/test/python/coilmq/store/dbm.py         | 262 +++++++++++++
 .../src/test/python/coilmq/store/memory.py      |  76 ++++
 .../src/test/python/coilmq/store/rds.py         |  69 ++++
 .../src/test/python/coilmq/store/sa/__init__.py | 205 ++++++++++
 .../src/test/python/coilmq/store/sa/meta.py     |   9 +
 .../src/test/python/coilmq/store/sa/model.py    |  53 +++
 ambari-common/src/test/python/coilmq/topic.py   | 144 +++++++
 .../src/test/python/coilmq/util/__init__.py     |  16 +
 .../src/test/python/coilmq/util/concurrency.py  |  96 +++++
 .../src/test/python/coilmq/util/frames.py       | 359 ++++++++++++++++++
 .../src/test/python/coilmq/util/six.py          |  16 +
 pom.xml                                         |   2 +
 30 files changed, 3636 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/NOTICE.txt
----------------------------------------------------------------------
diff --git a/NOTICE.txt b/NOTICE.txt
index 3b65239..50f982c 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -4,11 +4,16 @@ Copyright 2011-2013 The Apache Software Foundation
 This product includes software developed at The Apache Software
 Foundation (http://www.apache.org/).
 
-Component ambari-common/src/test/python are under the following copyright:
+Component ambari-common/src/test/python/mock are under the following copyright:
 
 Copyright (c) 2003-2012, Michael Foord
 All rights reserved.
 
+Component ambari-common/src/test/python/coilmq are under the following 
copyright:
+
+Copyright 2009 Hans Lellelid
+All rights reserved.
+
  
 Resource management library derived from Kokki, which is under following 
copyright:
  
@@ -23,5 +28,4 @@ Some rights reserved.
 This product includes Simplejson, library fast encoding and decoding of json. 
(https://github.com/simplejson/simplejson - MIT license)
 
 Copyright (c) 2006 Bob Ippolito.
-All rights reserved.
-
+All rights reserved.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py 
b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
new file mode 100644
index 0000000..8198217
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py
@@ -0,0 +1,225 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import sys
+import time
+import unittest
+import logging
+import socket
+import select
+import threading
+
+try:
+    from queue import Queue, Empty
+except ImportError:
+    from Queue import Queue, Empty
+
+from coilmq.util.frames import Frame, FrameBuffer
+from coilmq.queue import QueueManager
+from coilmq.topic import TopicManager
+from coilmq.util import frames
+from coilmq.server.socket_server import StompServer, StompRequestHandler, 
ThreadedStompServer
+from coilmq.store.memory import MemoryQueue
+from coilmq.scheduler import FavorReliableSubscriberScheduler, 
RandomQueueScheduler
+from coilmq.protocol import STOMP10
+
+class BaseStompServerTestCase(unittest.TestCase):
+    """
+    Base class for test cases provides the fixtures for setting up the 
multi-threaded
+    unit test infrastructure.
+    We use a combination of C{threading.Event} and C{Queue.Queue} objects to 
faciliate
+    inter-thread communication and lock-stepping the assertions. 
+    """
+
+    def setUp(self):
+
+        self.clients = []
+        self.server = None  # This gets set in the server thread.
+        self.server_address = None  # This gets set in the server thread.
+        self.ready_event = threading.Event()
+
+        addr_bound = threading.Event()
+
+        def start_server():
+            self.server = TestStompServer(('127.0.0.1', 21613),
+                                          ready_event=self.ready_event,
+                                          authenticator=None,
+                                          queue_manager=self._queuemanager(),
+                                          topic_manager=self._topicmanager())
+            self.server_address = self.server.socket.getsockname()
+            addr_bound.set()
+            self.server.serve_forever()
+
+        self.server_thread = threading.Thread(
+            target=start_server, name='server')
+        self.server_thread.start()
+        self.ready_event.wait()
+        addr_bound.wait()
+
+    def _queuemanager(self):
+        """
+        Returns the configured L{QueueManager} instance to use.
+        Can be overridden by subclasses that wish to change out any queue mgr 
parameters.
+        @rtype: L{QueueManager}
+        """
+        return QueueManager(store=MemoryQueue(),
+                            
subscriber_scheduler=FavorReliableSubscriberScheduler(),
+                            queue_scheduler=RandomQueueScheduler())
+
+    def _topicmanager(self):
+        """
+        Returns the configured L{TopicManager} instance to use.
+        Can be overridden by subclasses that wish to change out any topic mgr 
parameters.
+        @rtype: L{TopicManager}
+        """
+        return TopicManager()
+
+    def tearDown(self):
+        for c in self.clients:
+            c.close()
+        import time; print time.time()
+        self.server.shutdown() # server_close takes too much time
+        import time; print time.time()
+        self.server_thread.join()
+        self.ready_event.clear()
+        del self.server_thread
+
+    def _new_client(self, connect=True):
+        """
+        Get a new L{TestStompClient} connected to our test server. 
+        The client will also be registered for close in the tearDown method.
+        @param connect: Whether to issue the CONNECT command.
+        @type connect: C{bool}
+        @rtype: L{TestStompClient}
+        """
+        client = TestStompClient(self.server_address)
+        self.clients.append(client)
+        if connect:
+            client.connect()
+            res = client.received_frames.get(timeout=1)
+            self.assertEqual(res.cmd, frames.CONNECTED)
+        return client
+
+
+class TestStompServer(ThreadedStompServer):
+    """
+    A stomp server for functional tests that uses C{threading.Event} objects
+    to ensure that it stays in sync with the test suite.
+    """
+
+    allow_reuse_address = True
+
+    def __init__(self, server_address,
+                 ready_event=None,
+                 authenticator=None,
+                 queue_manager=None,
+                 topic_manager=None):
+        self.ready_event = ready_event
+        StompServer.__init__(self, server_address, StompRequestHandler,
+                             authenticator=authenticator,
+                             queue_manager=queue_manager,
+                             topic_manager=topic_manager,
+                             protocol=STOMP10)
+
+    def server_activate(self):
+        self.ready_event.set()
+        StompServer.server_activate(self)
+
+
+class TestStompClient(object):
+    """
+    A stomp client for use in testing.
+    This client spawns a listener thread and pushes anything that comes in 
onto the 
+    read_frames queue.
+    @ivar received_frames: A queue of Frame instances that have been received.
+    @type received_frames: C{Queue.Queue} containing any received 
C{stompclient.frame.Frame}
+    """
+
+    def __init__(self, addr, connect=True):
+        """
+        @param addr: The (host,port) tuple for connection.
+        @type addr: C{tuple}
+        @param connect: Whether to connect socket to specified addr.
+        @type connect: C{bool}
+        """
+        self.log = logging.getLogger('%s.%s' % (
+            self.__module__, self.__class__.__name__))
+        self.sock = None
+        self.addr = addr
+        self.received_frames = Queue()
+        self.read_stopped = threading.Event()
+        self.buffer = FrameBuffer()
+        if connect:
+            self._connect()
+
+    def connect(self, headers=None):
+        self.send_frame(Frame(frames.CONNECT, headers=headers))
+
+    def send(self, destination, message, set_content_length=True, 
extra_headers=None):
+        headers = extra_headers or {}
+        headers['destination'] = destination
+        if set_content_length:
+            headers['content-length'] = len(message)
+        self.send_frame(Frame('send', headers=headers, body=message))
+
+    def subscribe(self, destination):
+        self.send_frame(Frame('subscribe', headers={
+                        'destination': destination}))
+
+    def send_frame(self, frame):
+        """
+        Sends a stomp frame.
+        @param frame: The stomp frame to send.
+        @type frame: L{stompclient.frame.Frame}
+        """
+        if not self.connected:
+            raise RuntimeError("Not connected")
+        self.sock.send(frame.pack())
+
+    def _connect(self):
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.connect(self.addr)
+        self.connected = True
+        self.read_stopped.clear()
+        t = threading.Thread(target=self._read_loop,
+                             name="client-receiver-%s" % hex(id(self)))
+        t.start()
+
+    def _read_loop(self):
+        while self.connected:
+            r, w, e = select.select([self.sock], [], [], 0.1)
+            if r:
+                data = self.sock.recv(1024)
+                self.buffer.append(data)
+                for frame in self.buffer:
+                    self.log.debug("Processing frame: %s" % frame)
+                    self.received_frames.put(frame)
+        self.read_stopped.set()
+        # print "Read loop has been quit! for %s" % id(self)
+
+    def disconnect(self):
+        self.send_frame(Frame('disconnect'))
+
+    def close(self):
+        if not self.connected:
+            raise RuntimeError("Not connected")
+        self.connected = False
+        self.read_stopped.wait(timeout=0.5)
+        self.sock.close()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py 
b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
new file mode 100644
index 0000000..4c61bf6
--- /dev/null
+++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py
@@ -0,0 +1,35 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+from coilmq.util import frames
+from coilmq.util.frames import Frame
+
+from BaseStompServerTestCase import BaseStompServerTestCase
+
+class TestAgentStompResponses(BaseStompServerTestCase):
+  def test_mock_server_can_start(self):
+    c1 = self._new_client()
+    c1.connect()
+    
+    c1.subscribe('/topics/test')
+    
+    f = Frame(frames.MESSAGE, headers={'destination': '/topics/test'}, 
body='test-body')
+    self.server.topic_manager.send(f)
+    
+    
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/__init__.py 
b/ambari-common/src/test/python/coilmq/__init__.py
new file mode 100644
index 0000000..c7010e0
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/__init__.py
@@ -0,0 +1,16 @@
+"""
+Top-level CoilMQ package.
+"""
+__authors__ = ['"Hans Lellelid" <[email protected]>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the 
"License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/auth/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/auth/__init__.py 
b/ambari-common/src/test/python/coilmq/auth/__init__.py
new file mode 100644
index 0000000..5ce3b61
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/auth/__init__.py
@@ -0,0 +1,34 @@
+"""
+Authentication providers.
+
+Because authentication providers are instantiated and configured in the 
application scope
+(and not in the request handler), the authenticator implementations must be 
thread-safe.
+"""
+import abc
+__authors__ = ['"Hans Lellelid" <[email protected]>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the 
"License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+
+class Authenticator(object):
+    """ Abstract base class for authenticators. """
+    __metaclass__ = abc.ABCMeta
+
+    @abc.abstractmethod
+    def authenticate(self, login, passcode):
+        """
+        Authenticate the login and passcode.
+
+        @return: Whether user is authenticated.
+        @rtype: C{bool} 
+        """

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/auth/simple.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/auth/simple.py 
b/ambari-common/src/test/python/coilmq/auth/simple.py
new file mode 100644
index 0000000..cb092bb
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/auth/simple.py
@@ -0,0 +1,101 @@
+"""
+A simple config-file based authentication module.
+"""
+try:
+    from configparser import ConfigParser
+except ImportError:
+    from ConfigParser import ConfigParser
+    ConfigParser.read_file = ConfigParser.readfp
+
+from coilmq.auth import Authenticator
+from coilmq.config import config
+from coilmq.exception import ConfigError
+
+__authors__ = ['"Hans Lellelid" <[email protected]>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the 
"License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+
+def make_simple():
+    """
+    Create a L{SimpleAuthenticator} instance using values read from coilmq 
configuration.
+
+    @return: The configured L{SimpleAuthenticator}
+    @rtype: L{SimpleAuthenticator}
+    @raise ConfigError: If there is a configuration error.
+    """
+    authfile = config.get('coilmq', 'auth.simple.file')
+    if not authfile:
+        raise ConfigError('Missing configuration parameter: auth.simple.file')
+    sa = SimpleAuthenticator()
+    sa.from_configfile(authfile)
+    return sa
+
+
+class SimpleAuthenticator(Authenticator):
+    """
+    A simple configfile-based authenticator.
+
+    @ivar store:  Authentication key-value store (of logins to passwords).
+    @type store: C{dict} of C{str} to C{str}
+    """
+
+    def __init__(self, store=None):
+        """
+        Initialize the authenticator to use (optionally) specified C{dict} 
store.
+
+        @param store:  Authentication store, C{dict} of logins to passwords.
+        @type store: C{dict} of C{str} to C{str}
+        """
+        if store is None:
+            store = {}
+        self.store = store
+
+    def from_configfile(self, configfile):
+        """
+        Initialize the authentication store from a "config"-style file.
+
+        Auth "config" file is parsed with C{ConfigParser.RawConfigParser} and 
must contain
+        an [auth] section which contains the usernames (keys) and passwords 
(values).
+
+        Example auth file::
+
+            [auth]
+            someuser = somepass
+            anotheruser = anotherpass
+
+        @param configfile: Path to config file or file-like object.
+        @type configfile: C{any}
+        @raise ValueError: If file could not be read or does not contain 
[auth] section.
+        """
+        cfg = ConfigParser()
+        if hasattr(configfile, 'read'):
+            cfg.read_file(configfile)
+        else:
+            filesread = cfg.read(configfile)
+            if not filesread:
+                raise ValueError('Could not parse auth file: %s' % configfile)
+
+        if not cfg.has_section('auth'):
+            raise ValueError('Config file contains no [auth] section.')
+
+        self.store = dict(cfg.items('auth'))
+
+    def authenticate(self, login, passcode):
+        """
+        Authenticate the login and passcode.
+
+        @return: Whether provided login and password match values in store.
+        @rtype: C{bool}
+        """
+        return login in self.store and self.store[login] == passcode

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/config/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/config/__init__.py 
b/ambari-common/src/test/python/coilmq/config/__init__.py
new file mode 100644
index 0000000..4428cd4
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/config/__init__.py
@@ -0,0 +1,152 @@
+"""
+Configuration support functionality.
+
+The global C{config} object (C{ConfigParser.SafeConfigParser} instance) is 
initialized
+with default configuration from the defaults.cfg file, which is located in 
this package.
+In order to ensure that the config contains custom values, you must call the 
C{init_config}
+function during application initialization:
+
+from coilmq.config import config, init_config
+init_config('/path/to/config.cfg')
+
+config.getint('listen_port')
+"""
+import os.path
+import logging
+import logging.config
+import warnings
+import io
+
+try:
+    from configparser import ConfigParser
+except ImportError:
+    from ConfigParser import ConfigParser
+
+
+from pkg_resources import resource_filename, resource_stream
+__authors__ = ['"Hans Lellelid" <[email protected]>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the 
"License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+config = ConfigParser()
+config.read(os.path.join(os.path.dirname(__file__), 'defaults.cfg'))
+
+
+def init_config(config_file=None):
+    """
+    Initialize the configuration from a config file.
+
+    The values in config_file will override those already loaded from the 
default
+    configuration file (defaults.cfg, in current package).
+
+    This method does not setup logging.
+
+    @param config_file: The path to a configuration file.
+    @type config_file: C{str}
+
+    @raise ValueError: if the specified config_file could not be read.
+    @see: L{init_logging}  
+    """
+    global config
+
+    if config_file and os.path.exists(config_file):
+        read = config.read([config_file])
+        if not read:
+            raise ValueError(
+                "Could not read configuration from file: %s" % config_file)
+
+
+def init_logging(logfile=None, loglevel=logging.INFO, configfile=None):
+    """
+    Configures the logging using either basic filename + loglevel or passed 
config file path.
+
+    This is performed separately from L{init_config()} in order to support the 
case where 
+    logging should happen independent of (usu. *after*) other aspects of the 
configuration 
+    initialization. For example, if logging may need to be initialized within 
a  daemon 
+    context.
+
+    @param logfile: An explicitly specified logfile destination.  If this is 
specified in addition
+                    to default logging, a warning will be issued.
+    @type logfile: C{str}
+
+    @param loglevel: Which level to use when logging to explicitly specified 
file or stdout.
+    @type loglevel: C{int}
+
+    @param configfile: The path to a configuration file.  This takes 
precedence over any explicitly
+                        specified logfile/loglevel (but a warning will be 
logged if both are specified).
+                        If the file is not specified or does not exist annd no 
logfile was specified, 
+                        then the default.cfg configuration file will be used 
to initialize logging.
+    @type configfile: C{str}
+    """
+    # If a config file was specified, we will use that in place of the
+    # explicitly
+    use_configfile = False
+    if configfile and os.path.exists(configfile):
+        testcfg = ConfigParser()
+        read = testcfg.read(configfile)
+        use_configfile = (read and testcfg.has_section('loggers'))
+
+    if use_configfile:
+        logging.config.fileConfig(configfile)
+        if logfile:
+            msg = "Config file conflicts with explicitly specified logfile; 
config file takes precedence."
+            logging.warn(msg)
+    else:
+        format = '%(asctime)s [%(threadName)s] %(name)s - %(levelname)s - 
%(message)s'
+        if logfile:
+            logging.basicConfig(
+                filename=logfile, level=loglevel, format=format)
+        else:
+            logging.basicConfig(level=loglevel, format=format)
+
+
+def resolve_name(name):
+    """
+    Resolve a dotted name to some object (usually class, module, or function).
+
+    Supported naming formats include:
+        1. path.to.module:method
+        2. path.to.module.ClassName
+
+    >>> resolve_name('coilmq.store.memory.MemoryQueue')
+    <class 'coilmq.store.memory.MemoryQueue'>
+    >>> t = resolve_name('coilmq.store.dbm.make_dbm')
+    >>> import inspect
+    >>> inspect.isfunction(t)
+    True
+    >>> t.__name__
+    'make_dbm'
+
+    @param name: The dotted name (e.g. path.to.MyClass)
+    @type name: C{str}
+
+    @return: The resolved object (class, callable, etc.) or None if not found.
+    """
+    if ':' in name:
+        # Normalize foo.bar.baz:main to foo.bar.baz.main
+        # (since our logic below will handle that)
+        name = '%s.%s' % tuple(name.split(':'))
+
+    name = name.split('.')
+
+    used = name.pop(0)
+    found = __import__(used)
+    for n in name:
+        used = used + '.' + n
+        try:
+            found = getattr(found, n)
+        except AttributeError:
+            __import__(used)
+            found = getattr(found, n)
+
+    return found

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/config/coilmq.cfg-sample
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/config/coilmq.cfg-sample 
b/ambari-common/src/test/python/coilmq/config/coilmq.cfg-sample
new file mode 100644
index 0000000..165f1f6
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/config/coilmq.cfg-sample
@@ -0,0 +1,71 @@
+[coilmq]
+
+; Backend implementation configuration.
+; -------------------------------------
+; Factories for implementations can be configured by passing a dotted-path.
+; This is typically the path to a class or to a callable that returns a 
configured
+; object (that implements the necessary functionality).  The reason to use a 
callable
+; is in cases when the object may need to be initialized with values from the
+; app's configuration.  (For example, a database storage engine will need to 
know
+; the database connect URI.)
+
+; Configuration backend used for storage
+; qstore.factory = coilmq.store.memory.MemoryQueue
+
+; Configure the scheduler implementations used
+; scheduler.subscriber_priority_factory = 
coilmq.scheduler.FavorReliableSubscriberScheduler
+; scheduler.queue_priority_factory = coilmq.scheduler.RandomQueueScheduler
+
+; Configure the authenticator class -- and any options
+; auth.factory = coilmq.auth.simple:make_simple
+; auth.simple.file = /path/to/authfile.ini
+
+; You can include custom logging configuration, if you like.  If you do 
configure
+; logging here, then any commandline logging arguments (e.g. --logfile) will 
be _IGNORED_. 
+; See the Python documentation for the log file format: 
+;      http://docs.python.org/library/logging.html#configuration-file-format
+
+[loggers]
+keys=root,coilmq
+
+[handlers]
+keys=console,file,syslog
+
+[formatters]
+keys=threaded,syslog
+
+[logger_root]
+level=DEBUG
+handlers=console,file,syslog
+
+[logger_coilmq]
+level=DEBUG
+handlers=console,file,syslog
+qualname=coilmq
+propagate=0
+
+[handler_console]
+class=StreamHandler
+level=DEBUG
+formatter=threaded
+args=(sys.stdout,)
+
+[handler_syslog]
+class=handlers.SysLogHandler
+level=DEBUG
+formatter=syslog
+args=('/dev/log',)
+
+[handler_file]
+class=FileHandler
+level=DEBUG
+formatter=threaded
+args=('/tmp/coilmq.log', 'w')
+
+[formatter_syslog]
+format=[%(threadName)s] %(name)s - %(levelname)s - %(message)s
+datefmt= 
+
+[formatter_threaded]
+format=%(asctime)s [%(threadName)s] %(name)s - %(levelname)s - %(message)s
+datefmt=
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/config/defaults.cfg
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/config/defaults.cfg 
b/ambari-common/src/test/python/coilmq/config/defaults.cfg
new file mode 100644
index 0000000..3e8c27c
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/config/defaults.cfg
@@ -0,0 +1,41 @@
+; 
---------------------------------------------------------------------------------------
+; Default configuration values for the coilmq broker.
+;
+; Do not modify this file; instead, copy over the coilmq.cfg-sample file in 
this 
+; directory and customize that to your needs.  Those values will override any 
values
+; here.
+; 
---------------------------------------------------------------------------------------
+
+[coilmq]
+
+listen_addr = 127.0.0.1
+listen_port = 61613
+
+; Backend implementation configuration.
+; -------------------------------------
+; Factories for implementations can be configured by passing a dotted-path.
+; This is typically the path to a class or to a callable that returns a 
configured
+; object (that implements the necessary functionality).  The reason to use a 
callable
+; is in cases when the object may need to be initialized with values from the
+; app's configuration.  (For example, a database storage engine will need to 
know
+; the database connect URI.)
+
+; Configuration backend used for storage
+qstore.factory = coilmq.store.memory.MemoryQueue
+
+; Configure some defaults for the DBM store
+qstore.dbm.checkpoint_operations = 100
+qstore.dbm.checkpoint_timeout = 20
+
+; Configure the scheduler implementations used
+scheduler.subscriber_priority_factory = 
coilmq.scheduler.FavorReliableSubscriberScheduler
+scheduler.queue_priority_factory = coilmq.scheduler.RandomQueueScheduler
+
+; Authentication configuration
+; auth.factory = coilmq.auth.simple.make_simple
+; auth.simple.file = coilmq/tests/resources/auth.ini
+
+; When running in debug mode how frequently we should print out 
stats/diagnostic information
+; about queue sizes, memory, etc.
+; (Set to 0 too disable the diagnostic information.)
+debug.stats_poll_interval = 10.0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/engine.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/engine.py 
b/ambari-common/src/test/python/coilmq/engine.py
new file mode 100644
index 0000000..42352a3
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/engine.py
@@ -0,0 +1,94 @@
+from __future__ import absolute_import
+
+from coilmq.protocol import STOMP10
+
+"""
+Core STOMP server logic, abstracted from socket transport implementation.
+
+While this is abstracted from the socket transport implementation, it does
+operate on the assumption that there is an available connection to send 
response
+frames.
+
+We're also making some simplified assumptions here which may make this engine
+impractical for [high-performance] use in specifically asynchronous frameworks.
+More specifically, this class was not explicitly designed around async 
patterns, 
+meaning that it would likely be problematic to use with a framework like 
Twisted 
+if the underlying storage implementations were processor intensive (e.g. 
database
+access).  For the default memory storage engines, this shouldn't be a problem.
+
+This code is inspired by the design of the Ruby stompserver project, by 
+Patrick Hurley and Lionel Bouton.  See http://stompserver.rubyforge.org/
+"""
+import logging
+from collections import defaultdict
+
+__authors__ = ['"Hans Lellelid" <[email protected]>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the 
"License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+
+class StompEngine(object):
+    """ 
+    The engine provides the core business logic that we use to respond to 
STOMP protocol
+    messages.  
+
+    This class is transport-agnostic; it exposes methods that expect STOMP 
frames and
+    uses the attached connection to send frames to connected clients.
+
+    @ivar connection: The connection (aka "protocol") backing this engine.
+    @type connection: L{coilmq.server.StompConnection}
+
+    @ivar authenticator: An C{Authenticator} implementation to use.  Setting 
this value
+                            will implicitly cause authentication to be 
required.
+    @type authenticator: L{coilmq.auth.Authenticator}
+
+    @ivar queue_manager: The C{QueueManager} implementation to use.
+    @type queue_manager: L{coilmq.queue.QueueManager}
+
+    @ivar topic_manager: The C{TopicManager} implementation to use.
+    @type topic_manager: L{coilmq.topic.TopicManager}
+
+    @ivar transactions: Active transactions for this connection.
+    @type transactions: C{dict} of C{str} to C{list} 
+
+    @ivar connected: Whether engine is connected.
+    @type connected: C{bool}
+    """
+
+    def __init__(self, connection, authenticator, queue_manager, 
topic_manager, protocol=STOMP10):
+        """
+        @param connection: The stomp connection backing this engine.
+        @type connection: L{coilmq.server.StompConnection}
+        """
+        self.log = logging.getLogger('%s.%s' % (
+            self.__class__.__module__, self.__class__.__name__))
+        self.connection = connection
+        self.authenticator = authenticator
+        self.queue_manager = queue_manager
+        self.topic_manager = topic_manager
+        self.connected = False
+        self.transactions = defaultdict(list)
+
+        self.protocol = protocol(self)
+
+    def process_frame(self, frame):
+        self.protocol.process_frame(frame)
+
+    def unbind(self):
+        """
+        Unbinds this connection from queue and topic managers (freeing up 
resources)
+        and resets state.
+        """
+        self.connected = False
+        self.queue_manager.disconnect(self.connection)
+        self.topic_manager.disconnect(self.connection)

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/exception.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/exception.py 
b/ambari-common/src/test/python/coilmq/exception.py
new file mode 100644
index 0000000..75f476c
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/exception.py
@@ -0,0 +1,43 @@
+"""
+Exception classes used by CoilMQ.
+
+CoilMQ exceptions extend C{RuntimeError} or other appropriate sub-classes.  
These will be
+thrown if there is not a more appropriate error class already provided by 
builtins.
+"""
+__authors__ = ['"Hans Lellelid" <[email protected]>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the 
"License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+
+class ProtocolError(RuntimeError):
+    """
+    Represents an error at the STOMP protocol layer.
+    """
+
+
+class ConfigError(RuntimeError):
+    """
+    Represents an error in the configuration of the application.
+    """
+
+
+class AuthError(RuntimeError):
+    """
+    Represents an authentication or authorization error.
+    """
+
+
+class ClientDisconnected(Exception):
+    """
+    A signal that client has disconnected (so we shouldn't try to keep reading 
from the client).
+    """

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/protocol/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/protocol/__init__.py 
b/ambari-common/src/test/python/coilmq/protocol/__init__.py
new file mode 100644
index 0000000..da955ce
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/protocol/__init__.py
@@ -0,0 +1,342 @@
+import abc
+import uuid
+import socket
+import datetime
+
+from coilmq.exception import ProtocolError, AuthError
+from coilmq.util import frames
+from coilmq.util.frames import Frame, ErrorFrame, ReceiptFrame, ConnectedFrame
+from coilmq.util.concurrency import CoilThreadingTimer
+
+SEND = 'SEND'
+CONNECT = 'CONNECT'
+MESSAGE = 'MESSAGE'
+ERROR = 'ERROR'
+CONNECTED = 'CONNECTED'
+SUBSCRIBE = 'SUBSCRIBE'
+UNSUBSCRIBE = 'UNSUBSCRIBE'
+BEGIN = 'BEGIN'
+COMMIT = 'COMMIT'
+ABORT = 'ABORT'
+ACK = 'ACK'
+DISCONNECT = 'DISCONNECT'
+
+VALID_COMMANDS = ['message', 'connect', 'connected', 'error', 'send',
+                  'subscribe', 'unsubscribe', 'begin', 'commit', 'abort', 
'ack', 'disconnect', 'nack', 'stomp']
+
+
+class STOMP(object):
+
+    __metaclass__ = abc.ABCMeta
+
+    def __init__(self, engine):
+        self.engine = engine
+
+    def stomp(self, frame):
+        self.connect(frame)
+
+    @abc.abstractmethod
+    def process_frame(self, frame):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def connect(self, frame):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def send(self, frame):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def subscribe(self, frame):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def unsubscribe(self, frame):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def begin(self, frame):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def commit(self, frame):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def abort(self, frame):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def ack(self, frame):
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def disconnect(self, frame):
+        raise NotImplementedError
+
+
+class STOMP10(STOMP):
+
+    def process_frame(self, frame):
+        """
+        Dispatches a received frame to the appropriate internal method.
+
+        @param frame: The frame that was received.
+        @type frame: C{stompclient.frame.Frame}
+        """
+        cmd_method = frame.cmd.lower()
+
+        if not cmd_method in VALID_COMMANDS:
+            raise ProtocolError("Invalid STOMP command: {}".format(frame.cmd))
+
+        method = getattr(self, cmd_method, None)
+
+        if not self.engine.connected and method not in (self.connect, 
self.stomp):
+            raise ProtocolError("Not connected.")
+
+        try:
+            transaction = frame.headers.get('transaction')
+            if not transaction or method in (self.begin, self.commit, 
self.abort):
+                method(frame)
+            else:
+                if not transaction in self.engine.transactions:
+                    raise ProtocolError(
+                        "Invalid transaction specified: %s" % transaction)
+                self.engine.transactions[transaction].append(frame)
+        except Exception as e:
+            self.engine.log.error("Error processing STOMP frame: %s" % e)
+            self.engine.log.exception(e)
+            try:
+                self.engine.connection.send_frame(ErrorFrame(str(e), str(e)))
+            except Exception as e:  # pragma: no cover
+                self.engine.log.error("Could not send error frame: %s" % e)
+                self.engine.log.exception(e)
+        else:
+            # The protocol is not especially clear here (not sure why I'm 
surprised)
+            # about the expected behavior WRT receipts and errors.  We will 
assume that
+            # the RECEIPT frame should not be sent if there is an error frame.
+            # Also we'll assume that a transaction should not preclude sending 
the receipt
+            # frame.
+            # import pdb; pdb.set_trace()
+            if frame.headers.get('receipt') and method != self.connect:
+                self.engine.connection.send_frame(ReceiptFrame(
+                    receipt=frame.headers.get('receipt')))
+
+    def connect(self, frame, response=None):
+        """
+        Handle CONNECT command: Establishes a new connection and checks auth 
(if applicable).
+        """
+        self.engine.log.debug("CONNECT")
+
+        if self.engine.authenticator:
+            login = frame.headers.get('login')
+            passcode = frame.headers.get('passcode')
+            if not self.engine.authenticator.authenticate(login, passcode):
+                raise AuthError("Authentication failed for %s" % login)
+
+        self.engine.connected = True
+
+        response = response or Frame(frames.CONNECTED)
+        response.headers['session'] = uuid.uuid4()
+
+        # TODO: Do we want to do anything special to track sessions?
+        # (Actually, I don't think the spec actually does anything with this 
at all.)
+        self.engine.connection.send_frame(response)
+
+    def send(self, frame):
+        """
+        Handle the SEND command: Delivers a message to a queue or topic 
(default).
+        """
+        dest = frame.headers.get('destination')
+        if not dest:
+            raise ProtocolError('Missing destination for SEND command.')
+
+        if dest.startswith('/queue/'):
+            self.engine.queue_manager.send(frame)
+        else:
+            self.engine.topic_manager.send(frame)
+
+    def subscribe(self, frame):
+        """
+        Handle the SUBSCRIBE command: Adds this connection to destination.
+        """
+        ack = frame.headers.get('ack')
+        reliable = ack and ack.lower() == 'client'
+
+        self.engine.connection.reliable_subscriber = reliable
+
+        dest = frame.headers.get('destination')
+        if not dest:
+            raise ProtocolError('Missing destination for SUBSCRIBE command.')
+
+        if dest.startswith('/queue/'):
+            self.engine.queue_manager.subscribe(self.engine.connection, dest)
+        else:
+            self.engine.topic_manager.subscribe(self.engine.connection, dest)
+
+    def unsubscribe(self, frame):
+        """
+        Handle the UNSUBSCRIBE command: Removes this connection from 
destination.
+        """
+        dest = frame.headers.get('destination')
+        if not dest:
+            raise ProtocolError('Missing destination for UNSUBSCRIBE command.')
+
+        if dest.startswith('/queue/'):
+            self.engine.queue_manager.unsubscribe(self.engine.connection, dest)
+        else:
+            self.engine.topic_manager.unsubscribe(self.engine.connection, dest)
+
+    def begin(self, frame):
+        """
+        Handles BEGING command: Starts a new transaction.
+        """
+        if not frame.transaction:
+            raise ProtocolError("Missing transaction for BEGIN command.")
+
+        self.engine.transactions[frame.transaction] = []
+
+    def commit(self, frame):
+        """
+        Handles COMMIT command: Commits specified transaction.
+        """
+        if not frame.transaction:
+            raise ProtocolError("Missing transaction for COMMIT command.")
+
+        if not frame.transaction in self.engine.transactions:
+            raise ProtocolError("Invalid transaction: %s" % frame.transaction)
+
+        for tframe in self.engine.transactions[frame.transaction]:
+            del tframe.headers['transaction']
+            self.process_frame(tframe)
+
+        self.engine.queue_manager.clear_transaction_frames(
+            self.engine.connection, frame.transaction)
+        del self.engine.transactions[frame.transaction]
+
+    def abort(self, frame):
+        """
+        Handles ABORT command: Rolls back specified transaction.
+        """
+        if not frame.transaction:
+            raise ProtocolError("Missing transaction for ABORT command.")
+
+        if not frame.transaction in self.engine.transactions:
+            raise ProtocolError("Invalid transaction: %s" % frame.transaction)
+
+        self.engine.queue_manager.resend_transaction_frames(
+            self.engine.connection, frame.transaction)
+        del self.engine.transactions[frame.transaction]
+
+    def ack(self, frame):
+        """
+        Handles the ACK command: Acknowledges receipt of a message.
+        """
+        if not frame.message_id:
+            raise ProtocolError("No message-id specified for ACK command.")
+        self.engine.queue_manager.ack(self.engine.connection, frame)
+
+    def disconnect(self, frame):
+        """
+        Handles the DISCONNECT command: Unbinds the connection.
+
+        Clients are supposed to send this command, but in practice it should 
not be
+        relied upon.
+        """
+        self.engine.log.debug("Disconnect")
+        self.engine.unbind()
+
+
+class STOMP11(STOMP10):
+
+    SUPPORTED_VERSIONS = {'1.0', '1.1'}
+
+    def __init__(self, engine, send_heartbeat_interval=100, 
receive_heartbeat_interval=100, *args, **kwargs):
+        super(STOMP11, self).__init__(engine)
+        self.last_hb = datetime.datetime.now()
+        self.last_hb_sent = datetime.datetime.now()
+        self.timer = CoilThreadingTimer()
+
+        # flags to control heartbeating
+        self.send_hb = self.receive_hb = False
+
+        self.send_heartbeat_interval = 
datetime.timedelta(milliseconds=send_heartbeat_interval)
+        self.receive_heartbeat_interval = 
datetime.timedelta(milliseconds=receive_heartbeat_interval)
+
+    def enable_heartbeat(self, cx, cy, response):
+        if self.send_heartbeat_interval and cy:
+            self.send_heartbeat_interval = max(self.send_heartbeat_interval, 
datetime.timedelta(milliseconds=cy))
+            self.timer.schedule(max(self.send_heartbeat_interval, 
datetime.timedelta(milliseconds=cy)).total_seconds(), self.send_heartbeat)
+        if self.receive_heartbeat_interval and cx:
+            self.timer.schedule(max(self.send_heartbeat_interval, 
datetime.timedelta(milliseconds=cx)).total_seconds(),
+                                self.receive_heartbeat)
+        self.timer.start()
+        response.headers['heart-beat'] = 
'{0},{1}'.format(int(self.send_heartbeat_interval.microseconds / 1000),
+                                                          
int(self.receive_heartbeat_interval.microseconds / 1000))
+
+    def disable_heartbeat(self):
+        self.timer.stop()
+
+    def send_heartbeat(self):
+        # screw it, just send an error frame
+        self.engine.connection.send_frame(ErrorFrame('heartbeat'))
+
+    def receive_heartbeat(self):
+        ago = datetime.datetime.now() - self.last_hb
+        if ago > self.receive_heartbeat_interval:
+            self.engine.log.debug("No heartbeat was received for {0} 
seconds".format(ago.total_seconds()))
+            self.engine.unbind()
+
+    def connect(self, frame, response=None):
+        connected_frame = Frame(frames.CONNECTED)
+        self._negotiate_protocol(frame, connected_frame)
+        heart_beat = frame.headers.get('heart-beat', '0,0')
+        if heart_beat:
+            self.enable_heartbeat(*map(int, heart_beat.split(',')), 
response=connected_frame)
+        super(STOMP11, self).connect(frame, response=connected_frame)
+
+    def nack(self, frame):
+        """
+        Handles the NACK command: Unacknowledges receipt of a message.
+        For now, this is just a placeholder to implement this version of the 
protocol
+        """
+        if not frame.headers.get('message-id'):
+            raise ProtocolError("No message-id specified for NACK command.")
+        if not frame.headers.get('subscription'):
+            raise ProtocolError("No subscription specified for NACK command.")
+
+    def _negotiate_protocol(self, frame, response):
+        client_versions = frame.headers.get('accept-version')
+        if not client_versions:
+            raise ProtocolError('No version specified')
+        common = set(client_versions.split(',')) & self.SUPPORTED_VERSIONS
+        if not common:
+            versions = ','.join(self.SUPPORTED_VERSIONS)
+            self.engine.connection.send_frame(Frame(
+                    frames.ERROR,
+                    headers={'version': versions, 'content-type': 
frames.TEXT_PLAIN},
+                    body='Supported protocol versions are {0}'.format(versions)
+            ))
+        else:
+            response.headers['version'] = max(common)
+            protocol_class = PROTOCOL_MAP[response.headers['version']]
+            if type(self) is not protocol_class:
+                self.engine.protocol = protocol_class(self.engine)
+                self.engine.protocol.connect(frame, response=response)
+
+
+class STOMP12(STOMP11):
+
+    SUPPORTED_VERSIONS = STOMP11.SUPPORTED_VERSIONS.union({'1.2', })
+
+    def connect(self, frame, response=None):
+        host = frame.headers.get('host')
+        if not host:
+            raise ProtocolError('"host" header is required')
+        if host != socket.getfqdn():
+            raise ProtocolError('Virtual hosting is not supported or host is 
unknown')
+        super(STOMP12, self).connect(frame, response)
+
+
+PROTOCOL_MAP = {'1.0': STOMP10, '1.1': STOMP11, '1.2': STOMP12}

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/queue.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/queue.py 
b/ambari-common/src/test/python/coilmq/queue.py
new file mode 100644
index 0000000..0a1160b
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/queue.py
@@ -0,0 +1,376 @@
+"""
+Queue manager, queue implementation, and supporting classes.
+
+This code is inspired by the design of the Ruby stompserver project, by 
+Patrick Hurley and Lionel Bouton.  See http://stompserver.rubyforge.org/
+"""
+import logging
+import threading
+import uuid
+from collections import defaultdict
+
+from coilmq.scheduler import FavorReliableSubscriberScheduler, 
RandomQueueScheduler
+from coilmq.util.concurrency import synchronized
+
+__authors__ = ['"Hans Lellelid" <[email protected]>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the 
"License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+lock = threading.RLock()
+
+
+class QueueManager(object):
+    """
+    Class that manages distribution of messages to queue subscribers.
+
+    This class uses C{threading.RLock} to guard the public methods.  This is 
probably
+    a bit excessive, given 1) the actomic nature of basic C{dict} read/write 
operations 
+    and  2) the fact that most of the internal data structures are keying off 
of the 
+    STOMP connection, which is going to be thread-isolated.  That said, this 
seems like 
+    the technically correct approach and should increase the chance of this 
code being
+    portable to non-GIL systems. 
+
+    @ivar store: The queue storage backend to use.
+    @type store: L{coilmq.store.QueueStore}
+
+    @ivar subscriber_scheduler: The scheduler that chooses which subscriber to 
send
+                                    messages to.
+    @type subscriber_scheduler: L{coilmq.scheduler.SubscriberPriorityScheduler}
+
+    @ivar queue_scheduler: The scheduler that chooses which queue to select 
for sending
+                                    backlogs for a single connection.
+    @type queue_scheduler: L{coilmq.scheduler.QueuePriorityScheduler}
+
+    @ivar _queues: A dict of registered queues, keyed by destination.
+    @type _queues: C{dict} of C{str} to C{set} of 
L{coilmq.server.StompConnection}
+
+    @ivar _pending: All messages waiting for ACK from clients.
+    @type _pending: C{dict} of L{coilmq.server.StompConnection} to 
C{stompclient.frame.Frame}
+
+    @ivar _transaction_frames: Frames that have been ACK'd within a 
transaction.
+    @type _transaction_frames: C{dict} of L{coilmq.server.StompConnection} to 
C{dict} of C{str} to C{stompclient.frame.Frame}
+    """
+
+    def __init__(self, store, subscriber_scheduler=None, queue_scheduler=None):
+        """
+        @param store: The queue storage backend.
+        @type store: L{coilmq.store.QueueStore}
+
+        @param subscriber_scheduler: The scheduler that chooses which 
subscriber to send
+                                    messages to.
+        @type subscriber_scheduler: 
L{coilmq.scheduler.SubscriberPriorityScheduler}
+
+        @param queue_scheduler: The scheduler that chooses which queue to 
select for sending
+                                    backlogs for a single connection.
+        @type queue_scheduler: L{coilmq.scheduler.QueuePriorityScheduler}
+        """
+        self.log = logging.getLogger(
+            '%s.%s' % (__name__, self.__class__.__name__))
+
+        # Use default schedulers, if they're not specified
+        if subscriber_scheduler is None:
+            subscriber_scheduler = FavorReliableSubscriberScheduler()
+
+        if queue_scheduler is None:
+            queue_scheduler = RandomQueueScheduler()
+
+        # This lock var is required by L{synchronized} decorator.
+        self._lock = threading.RLock()
+
+        self.store = store
+        self.subscriber_scheduler = subscriber_scheduler
+        self.queue_scheduler = queue_scheduler
+
+        self._queues = defaultdict(set)
+        self._transaction_frames = defaultdict(lambda: defaultdict(list))
+        self._pending = {}
+
+    @synchronized(lock)
+    def close(self):
+        """
+        Closes all resources/backends associated with this queue manager.
+        """
+        self.log.info("Shutting down queue manager.")
+        if hasattr(self.store, 'close'):
+            self.store.close()
+
+        if hasattr(self.subscriber_scheduler, 'close'):
+            self.subscriber_scheduler.close()
+
+        if hasattr(self.queue_scheduler, 'close'):
+            self.queue_scheduler.close()
+
+    @synchronized(lock)
+    def subscriber_count(self, destination=None):
+        """
+        Returns a count of the number of subscribers.
+
+        If destination is specified then it only returns count of subscribers 
+        for that specific destination.
+
+        @param destination: The optional topic/queue destination (e.g. 
'/queue/foo')
+        @type destination: C{str}
+        """
+        if destination:
+            return len(self._queues[destination])
+        else:
+            # total them up
+            total = 0
+            for k in self._queues.keys():
+                total += len(self._queues[k])
+            return total
+
+    @synchronized(lock)
+    def subscribe(self, connection, destination):
+        """
+        Subscribes a connection to the specified destination (topic or queue). 
+
+        @param connection: The connection to subscribe.
+        @type connection: L{coilmq.server.StompConnection}
+
+        @param destination: The topic/queue destination (e.g. '/queue/foo')
+        @type destination: C{str} 
+        """
+        self.log.debug("Subscribing %s to %s" % (connection, destination))
+        self._queues[destination].add(connection)
+        self._send_backlog(connection, destination)
+
+    @synchronized(lock)
+    def unsubscribe(self, connection, destination):
+        """
+        Unsubscribes a connection from a destination (topic or queue).
+
+        @param connection: The client connection to unsubscribe.
+        @type connection: L{coilmq.server.StompConnection}
+
+        @param destination: The topic/queue destination (e.g. '/queue/foo')
+        @type destination: C{str} 
+        """
+        self.log.debug("Unsubscribing %s from %s" % (connection, destination))
+        if connection in self._queues[destination]:
+            self._queues[destination].remove(connection)
+
+        if not self._queues[destination]:
+            del self._queues[destination]
+
+    @synchronized(lock)
+    def disconnect(self, connection):
+        """
+        Removes a subscriber connection, ensuring that any pending commands 
get requeued.
+
+        @param connection: The client connection to unsubscribe.
+        @type connection: L{coilmq.server.StompConnection}
+        """
+        self.log.debug("Disconnecting %s" % connection)
+        if connection in self._pending:
+            pending_frame = self._pending[connection]
+            self.store.requeue(pending_frame.headers.get(
+                'destination'), pending_frame)
+            del self._pending[connection]
+
+        for dest in list(self._queues.keys()):
+            if connection in self._queues[dest]:
+                self._queues[dest].remove(connection)
+            if not self._queues[dest]:
+                # This won't trigger RuntimeError, since we're using keys()
+                del self._queues[dest]
+
+    @synchronized(lock)
+    def send(self, message):
+        """
+        Sends a MESSAGE frame to an eligible subscriber connection.
+
+        Note that this method will modify the incoming message object to 
+        add a message-id header (if not present) and to change the command
+        to 'MESSAGE' (if it is not).
+
+        @param message: The message frame.
+        @type message: C{stompclient.frame.Frame}
+        """
+        dest = message.headers.get('destination')
+        if not dest:
+            raise ValueError(
+                "Cannot send frame with no destination: %s" % message)
+
+        message.cmd = 'message'
+
+        message.headers.setdefault('message-id', str(uuid.uuid4()))
+
+        # Grab all subscribers for this destination that do not have pending
+        # frames
+        subscribers = [s for s in self._queues[dest]
+                       if s not in self._pending]
+
+        if not subscribers:
+            self.log.debug(
+                "No eligible subscribers; adding message %s to queue %s" % 
(message, dest))
+            self.store.enqueue(dest, message)
+        else:
+            selected = self.subscriber_scheduler.choice(subscribers, message)
+            self.log.debug("Delivering message %s to subscriber %s" %
+                           (message, selected))
+            self._send_frame(selected, message)
+
+    @synchronized(lock)
+    def ack(self, connection, frame, transaction=None):
+        """
+        Acknowledge receipt of a message.
+
+        If the `transaction` parameter is non-null, the frame being ack'd
+        will be queued so that it can be requeued if the transaction
+        is rolled back. 
+
+        @param connection: The connection that is acknowledging the frame.
+        @type connection: L{coilmq.server.StompConnection}
+
+        @param frame: The frame being acknowledged.
+
+        """
+        self.log.debug("ACK %s for %s" % (frame, connection))
+
+        if connection in self._pending:
+            pending_frame = self._pending[connection]
+            # Make sure that the frame being acknowledged matches
+            # the expected frame
+            if pending_frame.headers.get('message-id') != 
frame.headers.get('message-id'):
+                self.log.warning(
+                    "Got a ACK for unexpected message-id: %s" % 
frame.message_id)
+                self.store.requeue(pending_frame.destination, pending_frame)
+                # (The pending frame will be removed further down)
+
+            if transaction is not None:
+                self._transaction_frames[connection][
+                    transaction].append(pending_frame)
+
+            del self._pending[connection]
+            self._send_backlog(connection)
+
+        else:
+            self.log.debug("No pending messages for %s" % connection)
+
+    @synchronized(lock)
+    def resend_transaction_frames(self, connection, transaction):
+        """
+        Resend the messages that were ACK'd in specified transaction.
+
+        This is called by the engine when there is an abort command.
+
+        @param connection: The client connection that aborted the transaction.
+        @type connection: L{coilmq.server.StompConnection}
+
+        @param transaction: The transaction id (which was aborted).
+        @type transaction: C{str}
+        """
+        for frame in self._transaction_frames[connection][transaction]:
+            self.send(frame)
+
+    @synchronized(lock)
+    def clear_transaction_frames(self, connection, transaction):
+        """
+        Clears out the queued ACK frames for specified transaction. 
+
+        This is called by the engine when there is a commit command.
+
+        @param connection: The client connection that committed the 
transaction.
+        @type connection: L{coilmq.server.StompConnection}
+
+        @param transaction: The transaction id (which was committed).
+        @type transaction: C{str}
+        """
+        try:
+            del self._transaction_frames[connection][transaction]
+        except KeyError:
+            # There may not have been any ACK frames for this transaction.
+            pass
+
+    def _send_backlog(self, connection, destination=None):
+        """
+        Sends any queued-up messages for the (optionally) specified 
destination to connection.
+
+        If the destination is not provided, a destination is chosen using the 
+        L{QueueManager.queue_scheduler} scheduler algorithm.
+
+        (This method assumes it is being called from within a lock-guarded 
public
+        method.)  
+
+        @param connection: The client connection.
+        @type connection: L{coilmq.server.StompConnection}
+
+        @param destination: The topic/queue destination (e.g. '/queue/foo')
+        @type destination: C{str} 
+
+        @raise Exception: if the underlying connection object raises an error, 
the message
+                            will be re-queued and the error will be re-raised. 
 
+        """
+        if destination is None:
+            # Find all destinations that have frames and that contain this
+            # connection (subscriber).
+            eligible_queues = dict([(dest, q) for (dest, q) in 
self._queues.items()
+                                    if connection in q and 
self.store.has_frames(dest)])
+            destination = self.queue_scheduler.choice(
+                eligible_queues, connection)
+            if destination is None:
+                self.log.debug(
+                    "No eligible queues (with frames) for subscriber %s" % 
connection)
+                return
+
+        self.log.debug("Sending backlog to %s for destination %s" %
+                       (connection, destination))
+        if connection.reliable_subscriber:
+            # only send one message (waiting for ack)
+            frame = self.store.dequeue(destination)
+            if frame:
+                try:
+                    self._send_frame(connection, frame)
+                except Exception as x:
+                    self.log.error(
+                        "Error sending message %s (requeueing): %s" % (frame, 
x))
+                    self.store.requeue(destination, frame)
+                    raise
+        else:
+            for frame in self.store.frames(destination):
+                try:
+                    self._send_frame(connection, frame)
+                except Exception as x:
+                    self.log.error(
+                        "Error sending message %s (requeueing): %s" % (frame, 
x))
+                    self.store.requeue(destination, frame)
+                    raise
+
+    def _send_frame(self, connection, frame):
+        """
+        Sends a frame to a specific subscriber connection.
+
+        (This method assumes it is being called from within a lock-guarded 
public
+        method.)
+
+        @param connection: The subscriber connection object to send to.
+        @type connection: L{coilmq.server.StompConnection}
+
+        @param frame: The frame to send.
+        @type frame: L{stompclient.frame.Frame}
+        """
+        assert connection is not None
+        assert frame is not None
+
+        self.log.debug("Delivering frame %s to connection %s" %
+                       (frame, connection))
+
+        if connection.reliable_subscriber:
+            if connection in self._pending:
+                raise RuntimeError("Connection already has a pending frame.")
+            self.log.debug(
+                "Tracking frame %s as pending for connection %s" % (frame, 
connection))
+            self._pending[connection] = frame
+
+        connection.send_frame(frame)

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/scheduler.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/scheduler.py 
b/ambari-common/src/test/python/coilmq/scheduler.py
new file mode 100644
index 0000000..4ac7c3e
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/scheduler.py
@@ -0,0 +1,141 @@
+"""
+Classes that provide delivery scheduler implementations.
+
+The default implementation used by the system for determining which subscriber
+(connection) should receive a message is simply a random choice but favoring
+reliable subscribers.  Developers can write their own delivery schedulers, 
which 
+should implement the methods defined in L{QueuePriorityScheduler} if they would
+like to customize the behavior.
+"""
+import abc
+import random
+
+__authors__ = ['"Hans Lellelid" <[email protected]>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the 
"License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+
+class SubscriberPriorityScheduler(object):
+    """ Abstract base class for choosing which recipient (subscriber) should 
receive a message. """
+    __metaclass__ = abc.ABCMeta
+
+    @abc.abstractmethod
+    def choice(self, subscribers, message):
+        """
+        Chooses which subscriber (from list) should recieve specified message.
+
+        @param subscribers: Collection of subscribed connections eligible to 
receive message. 
+        @type subscribers: C{list} of L{coilmq.server.StompConnection}
+
+        @param message: The message to be delivered. 
+        @type message: L{stompclient.frame.Frame}
+
+        @return: A selected subscriber from the list or None if no subscriber 
could be chosen (e.g. list is empty).
+        @rtype: L{coilmq.server.StompConnection}
+        """
+
+
+class QueuePriorityScheduler(object):
+    """
+    Abstract base class for objects that provide a way to prioritize the 
queues.
+    """
+
+    def choice(self, queues, connection):
+        """
+        Choose which queue to select for messages to specified connection.
+
+        @param queues: A C{dict} mapping queue name to queues (sets of frames) 
to which 
+                        specified connection is subscribed.
+        @type queues:  C{dict} of C{str} to C{set} of 
L{stompclient.frame.Frame}
+
+        @param connection: The connection that is going to be delivered the 
frame(s).
+        @type connection: L{coilmq.server.StompConnection}
+
+        @return: A selected queue destination (name) or None if queues C{dict} 
is empty.
+        @rtype: C{str}
+        """
+        raise NotImplementedError
+
+
+class RandomSubscriberScheduler(SubscriberPriorityScheduler):
+    """ A delivery scheduler that chooses a random subscriber for message 
recipient. """
+
+    def choice(self, subscribers, message):
+        """
+        Chooses a random connection from subscribers to deliver specified 
message.
+
+        @param subscribers: Collection of subscribed connections to 
destination. 
+        @type subscribers: C{list} of L{coilmq.server.StompConnection}
+
+        @param message: The message to be delivered. 
+        @type message: L{stompclient.frame.Frame}
+
+        @return: A random subscriber from the list or None if list is empty.
+        @rtype: L{coilmq.server.StompConnection}
+        """
+        if not subscribers:
+            return None
+        return random.choice(subscribers)
+
+
+class FavorReliableSubscriberScheduler(SubscriberPriorityScheduler):
+    """
+    A random delivery scheduler which prefers reliable subscribers.
+    """
+
+    def choice(self, subscribers, message):
+        """
+        Choose a random connection, favoring those that are reliable from
+        subscriber pool to deliver specified message.
+
+        @param subscribers: Collection of subscribed connections to 
destination. 
+        @type subscribers: C{list} of L{coilmq.server.StompConnection}
+
+        @param message: The message to be delivered. 
+        @type message: L{stompclient.frame.Frame}
+
+        @return: A random subscriber from the list or None if list is empty.
+        @rtype: L{coilmq.server.StompConnection}
+        """
+        if not subscribers:
+            return None
+        reliable_subscribers = [
+            s for s in subscribers if s.reliable_subscriber]
+        if reliable_subscribers:
+            return random.choice(reliable_subscribers)
+        else:
+            return random.choice(subscribers)
+
+
+class RandomQueueScheduler(QueuePriorityScheduler):
+    """
+    Implementation of L{QueuePriorityScheduler} that selects a random queue 
from the list.
+    """
+
+    def choice(self, queues, connection):
+        """
+        Chooses a random queue for messages to specified connection.
+
+        @param queues: A C{dict} mapping queue name to queues (sets of frames) 
to which 
+                        specified connection is subscribed.
+        @type queues:  C{dict} of C{str} to C{set} of 
L{stompclient.frame.Frame}
+
+        @param connection: The connection that is going to be delivered the 
frame(s).
+        @type connection: L{coilmq.server.StompConnection}
+
+        @return: A random queue destination or None if list is empty.
+        @rtype: C{str}
+        """
+        if not queues:
+            return None
+        return random.choice(list(queues.keys()))

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/server/__init__.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/server/__init__.py 
b/ambari-common/src/test/python/coilmq/server/__init__.py
new file mode 100644
index 0000000..1a7c52a
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/server/__init__.py
@@ -0,0 +1,44 @@
+"""
+Package of available server implementations and shared 
functionality/interfaces.
+
+CoilMQ is designed for the Python StompServer reference socket server 
(specifically 
+multi-threaded); however, some alternative implementation examples are also 
provided. 
+"""
+import abc
+
+__authors__ = ['"Hans Lellelid" <[email protected]>']
+__license__ = """Licensed under the Apache License, Version 2.0 (the 
"License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+
+class StompConnection(object):
+    """
+    An "interface" for server implementation classes to "implement". 
+
+    This class serves primarily as a means to document the API that CoilMQ 
will expect
+    the connection object to implement.
+
+    @ivar reliable_subscriber: Whether this client will ACK all messages.
+    @type reliable_subscriber: C{bool}
+    """
+    __metaclass__ = abc.ABCMeta
+
+    reliable_subscriber = False
+
+    @abc.abstractmethod
+    def send_frame(self, frame):
+        """
+        Uses this connection implementation to send the specified frame to a 
connected client.
+
+        @param frame: The STOMP frame to send.
+        @type frame: C{stompclient.frame.Frame}  
+        """

http://git-wip-us.apache.org/repos/asf/ambari/blob/c7612bcf/ambari-common/src/test/python/coilmq/server/socket_server.py
----------------------------------------------------------------------
diff --git a/ambari-common/src/test/python/coilmq/server/socket_server.py 
b/ambari-common/src/test/python/coilmq/server/socket_server.py
new file mode 100644
index 0000000..b4f081e
--- /dev/null
+++ b/ambari-common/src/test/python/coilmq/server/socket_server.py
@@ -0,0 +1,192 @@
+"""
+The default/recommended SocketServer-based server implementation. 
+"""
+import logging
+import socket
+import threading
+try:
+    from socketserver import BaseRequestHandler, TCPServer, ThreadingMixIn
+except ImportError:
+    from SocketServer import BaseRequestHandler, TCPServer, ThreadingMixIn
+
+
+from coilmq.util.frames import FrameBuffer
+from coilmq.server import StompConnection
+from coilmq.engine import StompEngine
+from coilmq.exception import ClientDisconnected
+
+__authors__ = ['"Hans Lellelid" <[email protected]>']
+__copyright__ = "Copyright 2009 Hans Lellelid"
+__license__ = """Licensed under the Apache License, Version 2.0 (the 
"License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License."""
+
+
+class StompRequestHandler(BaseRequestHandler, StompConnection):
+    """
+    Class that will be instantiated to handle STOMP connections.
+
+    This class will be instantiated once per connection to the server.  In a 
multi-threaded
+    context, that means that instances of this class are scoped to a single 
thread.  It should
+    be noted that while the L{coilmq.engine.StompEngine} instance will be 
thread-local, the 
+    storage containers configured into the engine are not thread-local (and 
hence must be
+    thread-safe). 
+
+    @ivar buffer: A StompBuffer instance which buffers received data (to 
ensure we deal with
+                    complete STOMP messages.
+    @type buffer: C{stompclient.util.FrameBuffer}
+
+    @ivar engine: The STOMP protocol engine.
+    @type engine: L{coilmq.engine.StompEngine}
+
+    @ivar debug: Whether to enable extra-verbose debug logging.  (Will be 
logged at debug level.)
+    @type debug: C{bool}
+    """
+
+    def setup(self):
+        if self.server.timeout is not None:
+            self.request.settimeout(self.server.timeout)
+        self.debug = False
+        self.log = logging.getLogger('%s.%s' % (self.__module__, 
self.__class__.__name__))
+        self.buffer = FrameBuffer()
+        self.engine = StompEngine(connection=self,
+                                  authenticator=self.server.authenticator,
+                                  queue_manager=self.server.queue_manager,
+                                  topic_manager=self.server.topic_manager,
+                                  protocol=self.server.protocol)
+
+    def handle(self):
+        """
+        Handle a new socket connection.
+        """
+        # self.request is the TCP socket connected to the client
+        try:
+            while not self.server._shutdown_request_event.is_set():
+                try:
+                    data = self.request.recv(8192)
+                    if not data:
+                        break
+                    if self.debug:
+                        self.log.debug("RECV: %r" % data)
+                    self.buffer.append(data)
+
+                    for frame in self.buffer:
+                        self.log.debug("Processing frame: %s" % frame)
+                        self.engine.process_frame(frame)
+                        if not self.engine.connected:
+                            raise ClientDisconnected()
+                except socket.timeout:  # pragma: no cover
+                    pass
+        except ClientDisconnected:
+            self.log.debug("Client disconnected, discontinuing read loop.")
+        except Exception as e:  # pragma: no cover
+            self.log.error("Error receiving data (unbinding): %s" % e)
+            self.engine.unbind()
+            raise
+
+    def finish(self):
+        """
+        Normal (non-error) termination of request.
+
+        Unbinds the engine.
+        @see: L{coilmq.engine.StompEngine.unbind}
+        """
+        self.engine.unbind()
+
+    def send_frame(self, frame):
+        """ Sends a frame to connected socket client.
+
+        @param frame: The frame to send.
+        @type frame: C{stompclient.frame.Frame}
+        """
+        packed = frame.pack()
+        if self.debug:  # pragma: no cover
+            self.log.debug("SEND: %r" % packed)
+        self.request.sendall(packed)
+
+
+class StompServer(TCPServer):
+    """
+    Subclass of C{StompServer.TCPServer} to handle new connections with 
+    instances of L{StompRequestHandler}.
+
+    @ivar authenticator: The authenticator to use.
+    @type authenticator: L{coilmq.auth.Authenticator}
+
+    @ivar queue_manager: The queue manager to use.
+    @type queue_manager: L{coilmq.queue.QueueManager}
+
+    @ivar topic_manager: The topic manager to use.
+    @type topic_manager: L{coilmq.topic.TopicManager}
+    """
+
+    # This causes the SO_REUSEADDR option to be set on the socket, allowing
+    # server to rebind to the same address (w/o waiting for connections to
+    # leave TIME_WAIT after unclean disconnect).
+    allow_reuse_address = True
+
+    def __init__(self, server_address, RequestHandlerClass=None, timeout=3.0,
+                 authenticator=None, queue_manager=None, topic_manager=None, 
protocol=None):
+        """
+        Extension to C{TCPServer} constructor to provide mechanism for 
providing implementation classes.
+
+        @param server_address: The (address,port) C{tuple}
+        @param RequestHandlerClass: The class to use for handling requests.
+        @param timeout: The timeout for the underlying socket.
+        @keyword authenticator: The configure L{coilmq.auth.Authenticator} 
object to use.
+        @keyword queue_manager: The configured L{coilmq.queue.QueueManager} 
object to use.
+        @keyword topic_manager: The configured L{coilmq.topic.TopicManager} 
object to use. 
+        """
+        self.log = logging.getLogger('%s.%s' % (
+            self.__module__, self.__class__.__name__))
+        if not RequestHandlerClass:
+            RequestHandlerClass = StompRequestHandler
+        self.timeout = timeout
+        self.authenticator = authenticator
+        self.queue_manager = queue_manager
+        self.topic_manager = topic_manager
+        self.protocol = protocol
+        self._serving_event = threading.Event()
+        self._shutdown_request_event = threading.Event()
+        TCPServer.__init__(self, server_address, RequestHandlerClass)
+
+    def server_close(self):
+        """
+        Closes the socket server and any associated resources.
+        """
+        self.log.debug("Closing the socket server connection.")
+        TCPServer.server_close(self)
+        self.queue_manager.close()
+        self.topic_manager.close()
+        if hasattr(self.authenticator, 'close'):
+            self.authenticator.close()
+        self.shutdown()
+
+    def shutdown(self):
+        if self._serving_event.is_set():
+            self._shutdown_request_event.set()
+            self._serving_event.clear()
+            TCPServer.shutdown(self)
+
+    def serve_forever(self, poll_interval=0.5):
+        """Handle one request at a time until shutdown.
+
+        Polls for shutdown every poll_interval seconds. Ignores
+        self.timeout. If you need to do periodic tasks, do them in
+        another thread.
+        """
+        self._serving_event.set()
+        self._shutdown_request_event.clear()
+        TCPServer.serve_forever(self, poll_interval=poll_interval)
+
+
+class ThreadedStompServer(ThreadingMixIn, StompServer):
+    pass

Reply via email to