Author: aconway
Date: Wed Apr 30 01:13:24 2014
New Revision: 1591167
URL: http://svn.apache.org/r1591167
Log:
Author: Alan Conway <[email protected]>
Date: Thu Mar 27 15:38:40 2014 -0400
--- log message follows this line ---
NO-JIRA: First cut of system_test.py, a support library for router system tests.
This is a work in progress, see the module doc in system_test.py for more info.
system_tests_broker.py provides an example of how to use the library.
Added:
qpid/dispatch/trunk/tests/system_test.py (with props)
qpid/dispatch/trunk/tests/system_tests_broker.py (with props)
Added: qpid/dispatch/trunk/tests/system_test.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_test.py?rev=1591167&view=auto
==============================================================================
--- qpid/dispatch/trunk/tests/system_test.py (added)
+++ qpid/dispatch/trunk/tests/system_test.py Wed Apr 30 01:13:24 2014
@@ -0,0 +1,299 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""System test library, provides tools for tests that start multiple processes,
+with special support for qpidd and qdrouter processes.
+Features:
+- Create separate directories for each test.
+- Save logs, sub-process output, core files etc.
+- Automated clean-up after tests: kill sub-processes etc.
+- Tools to manipulate qpidd and qdrouter configuration files.
+- Sundry other tools.
+
+FIXME aconway 2014-03-27: we need to check what is installed & skip tests that
can't be run.
+
+Current we assume the following are installed:
+ - proton with python bindings
+ - qpidd with AMQP 1.0 support
+ - qpidtoollibs python module from qpid/tools
+ - qpid_messaging python module from qpid/cpp
+
+You can set this up from packages on fedora:
+
+ sudo yum install protonc qpid-cpp-server qpid-tools python-qpid-proton
python-qpid_messaging
+
+Here's how to build from source assuming you use default install prefix
/usr/local
+
+With a qpid checkout at $QPID:
+ cd $QPID/qpid/cpp/<build-directory>; make install
+ cd $QPID/qpid/tools; ./setup.py install --prefix /usr/local
+ cd $QPID/qpid/python; ./setup.py install --prefix /usr/local
+With a qpid-proton checkout at $PROTON
+ cd $PROTON/<build-directory>; make install
+
+And finally make sure to set up your environment:
+
+export PATH="$PATH:/usr/local/sbin:/usr/local/bin"
+export
PYTHONPATH="$PYTHONPATH:/usr/local/lib/proton/bindings/python:/usr/local/lib64/proton/bindings/python:/usr/local/lib/python2.7/site-packages:/usr/local/lib64/python2.7/site-packages"
+export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/usr/local/lib64"
+"""
+
+import sys, os, time, socket, random
+import subprocess, tempfile, shutil
+import unittest
+import qpidtoollibs
+import qpid_messaging as qm
+import proton
+from proton import Message, PENDING, ACCEPTED, REJECTED, RELEASED
+from copy import copy
+
+HOME=os.environ.get('QPID_DISPATCH_HOME')
+assert HOME, "QPID_DISPATCH_HOME not defined"
+
+def retry_delay(deadline, timeout, delay, max_delay):
+ """For internal use in retry. Sleep as required
+ and return the new delay or None if retry should time out"""
+ remaining = deadline - time.time()
+ if remaining <= 0: return None
+ time.sleep(min(delay, remaining))
+ return min(delay*2, max_delay)
+
+def retry(function, timeout=10, delay=.001, max_delay=1):
+ """Call function until it returns a true value or timeout expires.
+ Double the delay for each retry up to max_delay.
+ Returns what function returns or None if timeout expires.
+ """
+ deadline = time.time() + timeout
+ while True:
+ ret = function()
+ if ret:
+ return ret
+ else:
+ delay = retry_delay(deadline, timeout, delay, max_delay)
+ if delay is None: return None
+
+def retry_exception(function, timeout=10, delay=.001, max_delay=1,
exception_test=None):
+ """Call function until it returns without exception or timeout expires.
+ Double the delay for each retry up to max_delay.
+ Calls exception_test with any exception raised by function, exception_test
+ may itself raise an exception to terminate the retry.
+ Returns what function returns if it succeeds before timeout.
+ Raises last exception raised by function on timeout.
+ """
+ deadline = time.time() + timeout
+ while True:
+ try:
+ return function()
+ except Exception, e:
+ if exception_test: exception_test(e)
+ delay = retry_delay(deadline, timeout, delay, max_delay)
+ if delay is None: raise
+
+def wait_port(port, host="127.0.0.1", **retry_kwargs):
+ """Wait up to timeout for port (on host) to be connectable.
+ Takes same keyword arguments as retry to control the timeout"""
+ def check(e): # Only retry on connection refused
+ if not isinstance(e, socket.error) or not e.errno == 111: raise
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ try: retry_exception(lambda: s.connect((host, port)), exception_test=check,
+ **retry_kwargs)
+ except Exception, e:
+ raise Exception("wait_port timeout on %s:%s: %s"%(host, port, e))
+
+ finally: s.close()
+
+def wait_ports(ports, host="127.0.0.1", **retry_kwargs):
+ """Wait up to timeout for all ports (on host) to be connectable.
+ Takes same keyword arguments as retry to control the timeout"""
+ for p in ports: wait_port(p)
+
+class TestPopen(subprocess.Popen):
+ """Popen that can be torn down at the end of a TestCase and stores its
output."""
+
+ def __init__(self, name, args, **kwargs):
+ self.out = open(name+".out", 'w')
+ self.torndown = False
+ super(TestPopen, self).__init__(args, stdout=self.out,
stderr=subprocess.STDOUT, **kwargs)
+
+ def assert_running(self): assert self.poll() is None, "%s exited"%name
+
+ def __del__(self):
+ subprocess.Popen.__del__(self)
+ self.teardown()
+
+ def teardown(self):
+ if not self.torndown:
+ self.torndown = True
+ self.kill() # FIXME aconway 2014-03-27: check expectations
+ self.out.close()
+
+class Config(object):
+ """Base class for configuration objects that provide a convenient
+ way to create content for configuration files."""
+
+ def write(self, name, suffix=".conf"):
+ """Write the config object to file name.suffix. Returns name.suffix."""
+ name = name+suffix
+ with open(name,'w') as f: f.write(str(self))
+ return name
+
+
+class Qdrouterd(TestPopen):
+ """Run a Qpid Dispatch Router Daemon"""
+
+ class Config(list, Config):
+ """List of ('section', {'name':'value',...}).
+ Fills in some default values automatically, see Qdrouterd.DEFAULTS
+ """
+
+ DEFAULTS = {'listener':{'sasl-mechanisms':'ANONYMOUS'},
+
'connector':{'sasl-mechanisms':'ANONYMOUS','role':'on-demand'}}
+
+ def sections(self, name):
+ """Return list of sections named name"""
+ return [p for n,p in self if n == name]
+
+ def _defs(self, name, props):
+ """Fill in defaults for required values"""
+ if not name in Qdrouterd.Config.DEFAULTS: return props
+ p = copy(Qdrouterd.Config.DEFAULTS[name])
+ p.update(props);
+ return p
+
+ def __str__(self):
+ """Generate config file content. Fills in defaults for some
require values"""
+ def props(p): return "".join([" %s: %s\n"%(k,v) for k,v in
p.iteritems()])
+ return "".join(["%s {\n%s}\n"%(n,props(self._defs(n,p))) for n,p
in self])
+
+ def __init__(self, name, config, **kwargs):
+ self.config = copy(config)
+ super(Qdrouterd, self).__init__(name, ['qdrouterd', '-c',
config.write(name)])
+
+ @property
+ def ports(self):
+ """Return list of configured ports for all listeners"""
+ return [ l['port'] for l in self.config.sections('listener') ]
+
+ @property
+ def addresses(self):
+ """Return host:port addresses for all listeners"""
+ return [ "%s:%s"%(l['addr'],l['port']) for l in
self.config.sections('listener') ]
+
+ @property
+ def address(self):
+ """Return address of the first listener"""
+
+
+
+class Qpidd(TestPopen):
+ """Run a Qpid Daemon"""
+
+ class Config(dict, Config):
+
+ def __str__(self):
+ return "".join(["%s=%s\n"%(k,v) for k,v in self.iteritems()])
+
+
+ def __init__(self, name, config):
+ self.config = Qpidd.Config(
+ {'auth':'no',
+ 'log-to-stderr':'false', 'log-to-file':name+".log",
+ 'data-dir':name+".data"})
+ self.config.update(config)
+ super(Qpidd, self).__init__(name, ['qpidd', '--config',
self.config.write(name)]),
+ self.port = self.config['port'] or 5672
+ self.address = "127.0.0.1:%s"%self.port
+ self._agent = None
+
+ def qm_connect(self):
+ """Make a qpid_messaging connection to the broker"""
+ qm.Connection.establish(self.address)
+
+ @property
+ def agent(self, **kwargs):
+ if not self._agent: self._agent =
qpidtoollibs.BrokerAgent(self.qm_connect())
+ return self._agent
+
+
+
+class Messenger(proton.Messenger):
+ """Minor additions to Messenger for tests"""
+
+ def subscribe(self, source):
+ """proton.Messenger.subscribe and work till subscription is visible."""
+ t = proton.Messenger.subscribe(self, source)
+ while self.work(0.01): pass
+ return t
+
+class TestCase(unittest.TestCase):
+ """A test case that creates a separate directory for each test and
+ cleans up during teardown."""
+
+ def setUp(self):
+ self.save_dir = os.getcwd()
+ # self.id() is normally _module[.module].TestClass.test_name
+ id = self.id().split(".")
+ if len(id) == 1: # Not the expected format, just use dir = id.
+ dir = id[0]
+ else: # use dir = module[.module].TestClass/test_name
+ dir = os.path.join(".".join(id[0:-1]), id[-1])
+ shutil.rmtree(dir, ignore_errors=True) # FIXME aconway 2014-03-27:
wrong place
+ os.makedirs(dir)
+ os.chdir(dir)
+ self.cleanup_list = []
+ # FIXME aconway 2014-04-29: need a safer (configurable?) way to pick
ports.
+ self.next_port = random.randint(30000,40000)
+
+ def tearDown(self):
+ os.chdir(self.save_dir)
+ self.cleanup_list.reverse()
+ for t in self.cleanup_list:
+ for m in ["teardown", "tearDown", "stop", "close"]:
+ a = getattr(t, m, None)
+ if a: a(); break
+
+ def cleanup(self, x): self.cleanup_list.append(x); return x
+
+ def get_port(self):
+ """Get a (hopefully) unused port"""
+ p = self.next_port;
+ self.next_port += 1;
+ return p
+
+ def qdrouterd(self, *args, **kwargs):
+ """Return a Qdrouterd that will be cleaned up on teardown"""
+ return self.cleanup(Qdrouterd(*args, **kwargs))
+
+ def qpidd(self, *args, **kwargs):
+ """Return a Qpidd that will be cleaned up on teardown"""
+ return self.cleanup(Qpidd(*args, **kwargs))
+
+ def messenger(self, name="test-messenger"):
+ """Return a started Messenger that will be cleaned up on teardown."""
+ m = Messenger(name)
+ m.timeout = 1
+ m.start()
+ self.cleanup(m)
+ return m
+
+ def message(self, **properties):
+ """Convenience to create a proton.Message with properties set"""
+ m = Message()
+ for name, value in properties.iteritems(): setattr(m, name, value)
+ return m
Propchange: qpid/dispatch/trunk/tests/system_test.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/dispatch/trunk/tests/system_tests_broker.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/tests/system_tests_broker.py?rev=1591167&view=auto
==============================================================================
--- qpid/dispatch/trunk/tests/system_tests_broker.py (added)
+++ qpid/dispatch/trunk/tests/system_tests_broker.py Wed Apr 30 01:13:24 2014
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""System tests involving one or more brokers and dispatch routers
+
+FIXME aconway 2014-04-29:
+
+These tests is a work in progress, they do not pass
+and they are not run by the qdtest script.
+
+They are provided as an example of how to use the system_test module.
+
+To run the tests from a dispatch checkout:
+ . config.sh; python tests/system_tests_broker.py
+Note the tests wil
+"""
+
+from system_test import *
+
+class BrokerSystemTest(TestCase):
+
+ def test_broker(self):
+
+
+ testq = 'testq'
+
+ # Start two qpidd brokers called qpidd0 and qpidd1
+ qpidd = [
+ self.qpidd('qpidd%s'%i,
+ Qpidd.Config({'port':self.get_port(), 'trace':1}))
+ for i in xrange(2) ]
+
+ # Start a qdrouterd
+ router_conf = Qdrouterd.Config([
+ ('container', {'container-name':self.id()}),
+ ('router', { 'mode': 'standalone', 'router-id': self.id() }),
+ ('listener', {'addr':'0.0.0.0', 'port':self.get_port()}),
+ ('connector', {'name':'qpidd0', 'addr':'localhost',
'port':qpidd[0].port}),
+ ('connector', {'name':'qpidd1', 'addr':'localhost',
'port':qpidd[1].port}),
+ ('waypoint', {'name':testq, 'in-phase':0, 'out-phase':1,
'connector':'qpidd0'})
+ ])
+ router = self.qdrouterd('router0', router_conf)
+
+ # Wait for broker & router to be ready
+ wait_ports([q.port for q in qpidd] + router.ports)
+
+ # FIXME aconway 2014-03-27: smoke test for qpidd
+ qc = self.cleanup(qm.Connection.establish(qpidd[0].address))
+ qc.session().sender(testq+";{create:always}").send("a")
+ qr = qc.session().receiver(testq)
+ self.assertEqual(qr.fetch(1).content, "a")
+
+ # FIXME aconway 2014-03-28: smoke test for dispatch routing via queue
+ qaddr = router.addresses[0]+"/"+testq
+ m = self.message(address=qaddr, body="b")
+ mr = self.messenger()
+ mr.put(m)
+ mr.send()
+
+ # FIXME aconway 2014-03-28: check direct on broker
+
self.assertEqual(qc.session().receiver(testq).fetch(timeout=1).content, "b")
+ #self.assertEqual(sq.receiver(testq).fetch(timeout=1).content, "FOO")
+
+ # FIXME aconway 2014-03-28: subscribing first overshadows the waypoint?
+ m2 = self.messenger()
+ m2.subscribe(qaddr)
+ time.sleep(1)
+ m = Message()
+ m2.recv(1)
+ m2.get(m)
+ self.assertEqual(m.body, "b")
+
+if __name__ == '__main__': unittest.main()
Propchange: qpid/dispatch/trunk/tests/system_tests_broker.py
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]