Author: rajith
Date: Wed Nov 11 03:28:39 2009
New Revision: 834754

URL: http://svn.apache.org/viewvc?rev=834754&view=rev
Log:
testkit.py provides the plumbing for running longer duration tests using the 
multi-broker framework defined in brokertest.py
For the time being testkit is carrying it's own copy of brokertest.py. The goal 
is to use the one available under the /python folder asap.

Testkit is intended run as,
1) an ant target via "ant testkit" (to allow automated testing)
2) standalone against a release

If running standalone you need to have the qpid/python files in the python path 
and $QP_CP should be set to the classpath that contains the qpid jars.Assuming 
$PYTHON_DIR points to the python folder you could run it as follows.

$PYTHON_DIR/qpid-python-test -m testkit

The ant target is currently not operational as there seems to be a few issues 
when running under jython.

Tests
=========
Currently only 3 tests are added.
1. test_multiplexing_con
2. test_multiplexing_con_tx
3. test_failover

All tests are using the generic Sender and Receiver via the TestLauncher 
checked under the testkit module.
Currently there are occasional test failures for test_multiplexing_con_tx
The 'test_failover' test is currently failing due to a known bug.


Added:
    qpid/trunk/qpid/java/testkit/brokertest.py
    qpid/trunk/qpid/java/testkit/testkit.py   (with props)
Modified:
    qpid/trunk/qpid/java/build.xml

Modified: qpid/trunk/qpid/java/build.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/build.xml?rev=834754&r1=834753&r2=834754&view=diff
==============================================================================
--- qpid/trunk/qpid/java/build.xml (original)
+++ qpid/trunk/qpid/java/build.xml Wed Nov 11 03:28:39 2009
@@ -233,4 +233,13 @@
       </exec>
   </target>
 
+  <target name="testkit" depends="build,compile-tests">
+    <jython path="${mllib.dir}">
+      <args>
+        <arg value="${mllib.dir}/qpid-python-test"/>
+        <arg value="-m ${basedir}/testkit/testkit"/>
+      </args>
+    </jython>
+  </target>
+
 </project>

Added: qpid/trunk/qpid/java/testkit/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/brokertest.py?rev=834754&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/brokertest.py (added)
+++ qpid/trunk/qpid/java/testkit/brokertest.py Wed Nov 11 03:28:39 2009
@@ -0,0 +1,324 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Support library for tests that start multiple brokers, e.g. cluster
+# or federation
+
+import os, signal, string, tempfile, popen2, socket, threading, time
+import qpid
+from qpid import connection, messaging, util
+from qpid.harness import Skipped
+from unittest import TestCase
+from copy import copy
+from threading import Thread, Lock, Condition
+from shutil import rmtree 
+
+# Values for expected outcome of process at end of test
+EXPECT_EXIT_OK=1           # Expect to exit with 0 status before end of test.
+EXPECT_EXIT_FAIL=2         # Expect to exit with non-0 status before end of 
test.
+EXPECT_RUNNING=3           # Expect to still be running at end of test
+    
+def is_running(pid):
+    try:
+        os.kill(pid, 0)
+        return True
+    except:
+        return False
+
+class Unexpected(Exception):
+    pass
+
+class Popen(popen2.Popen3):
+    """
+    Similar to subprocess.Popen but using popen2 classes for portability.
+    Can set and verify expectation of process status at end of test.
+    """
+
+    def __init__(self, cmd, expect=EXPECT_EXIT_OK):
+        self.cmd  = [ str(x) for x in cmd ]
+        popen2.Popen3.__init__(self, self.cmd, True)
+        self.expect = expect
+        self.stdin = self.tochild
+        self.stdout = self.fromchild
+        self.stderr = self.childerr
+
+    def unexpected(self,msg):
+        raise Unexpected("%s: %s\n--stdout:\n%s\n--stderr:\n%s" %
+                         (msg, self.cmd_str(), self.stdout.read(), 
self.stderr.read()))
+    
+    def testend(self):                  # Clean up at end of test.
+        if self.expect == EXPECT_RUNNING:
+            try:
+                self.kill()
+            except:
+                self.unexpected("Expected running but exited %d" % self.wait())
+        else:
+            # Give the process some time to exit.
+            delay = 0.1
+            while (self.poll() is None and delay < 1):
+                time.sleep(delay)
+                delay *= 2
+            if self.returncode is None: # Still haven't stopped
+                self.kill()
+                self.unexpected("Expected to exit but still running")
+            elif self.expect == EXPECT_EXIT_OK and self.returncode != 0:
+                self.unexpected("Expected exit ok but exited %d" % 
self.returncode)
+            elif self.expect == EXPECT_EXIT_FAIL and self.returncode == 0:
+                self.unexpected("Expected to fail but exited ok")
+               
+    def communicate(self, input=None):
+        if input:
+            self.stdin.write(input)
+            self.stdin.close()
+        outerr = (self.stdout.read(), self.stderr.read())
+        self.wait()
+        return outerr
+
+    def is_running(self): return is_running(self.pid)
+
+    def poll(self):
+        self.returncode = popen2.Popen3.poll(self)
+        if (self.returncode == -1): self.returncode = None
+        return self.returncode
+
+    def wait(self):
+        self.returncode = popen2.Popen3.wait(self)
+        return self.returncode
+
+    def send_signal(self, sig):
+        os.kill(self.pid,sig)
+        self.wait()
+
+    def terminate(self): self.send_signal(signal.SIGTERM)
+    def kill(self): self.send_signal(signal.SIGKILL)
+        
+        
+
+    def cmd_str(self): return " ".join([str(s) for s in self.cmd])
+
+def checkenv(name):
+    value = os.getenv(name)
+    if not value: raise Exception("Environment variable %s is not set" % name)
+    return value
+
+class Broker(Popen):
+    "A broker process. Takes care of start, stop and logging."
+    _store_lib = os.getenv("STORE_LIB")
+    _qpidd = checkenv("QPIDD_EXEC")
+    _broker_count = 0
+
+    def __init__(self, test, args=[], name=None, expect=EXPECT_RUNNING):
+        """Start a broker daemon. name determines the data-dir and log
+        file names."""
+
+        self.test = test
+        cmd = [self._qpidd, "--port=0", "--no-module-dir", "--auth=no"] + args
+        if name: self.name = name
+        else:
+            self.name = "broker%d" % Broker._broker_count
+            Broker._broker_count += 1
+        self.log = os.path.join(test.dir, self.name+".log")
+        cmd += ["--log-to-file", self.log, "--log-prefix", 
self.name,"--log-to-stderr=no"]
+        self.datadir = os.path.join(test.dir, self.name)
+        cmd += ["--data-dir", self.datadir, "-t"]
+        if self._store_lib: cmd += ["--load-module", self._store_lib]
+
+        Popen.__init__(self, cmd, expect)
+        try: self.port = int(self.stdout.readline())
+        except Exception:
+            raise Exception("Failed to start broker: "+self.cmd_str())
+        test.cleanup_popen(self)
+        self.host = "localhost"         # Placeholder for remote brokers.
+
+    def connect(self):
+        """New API connection to the broker."""
+        return messaging.Connection.open(self.host, self.port)
+
+    def connect_old(self):
+        """Old API connection to the broker."""
+        socket = qpid.util.connect(self.host,self.port)
+        connection = qpid.connection.Connection (sock=socket)
+        connection.start()
+        return connection;
+
+    def declare_queue(self, queue):
+        c = self.connect_old()
+        s = c.session(str(qpid.datatypes.uuid4()))
+        s.queue_declare(queue=queue)
+        c.close()
+
+class Cluster:
+    """A cluster of brokers in a test."""
+
+    _cluster_lib = checkenv("CLUSTER_LIB")
+    _cluster_count = 0
+
+    def __init__(self, test, count=0, args=[], expect=EXPECT_RUNNING):
+        self.test = test
+        self._brokers=[]
+        self.name = "cluster%d" % Cluster._cluster_count
+        Cluster._cluster_count += 1
+        # Use unique cluster name
+        self.args = copy(args)
+        self.args += [ "--cluster-name", "%s-%s:%d" % (self.name, 
socket.gethostname(), os.getpid()) ]
+        self.args += [ "--load-module", self._cluster_lib ]
+        self.start_n(count, expect=expect)
+
+    def start(self, name=None, expect=EXPECT_RUNNING):
+        """Add a broker to the cluster. Returns the index of the new broker."""
+        if not name: name="%s-%d" % (self.name, len(self._brokers))
+        self._brokers.append(self.test.broker(self.args, name, expect))
+        return self._brokers[-1]
+
+    def start_n(self, count, expect=EXPECT_RUNNING):
+        for i in range(count): self.start(expect=expect)
+
+    def wait(self):
+        """Wait for all cluster members to be ready"""
+        for b in self._brokers:
+            b.connect().close()
+
+    # Behave like a list of brokers.
+    def __len__(self): return len(self._brokers)
+    def __getitem__(self,index): return self._brokers[index]
+    def __iter__(self): return self._brokers.__iter__()
+
+class BrokerTest(TestCase):
+    """
+    Tracks processes started by test and kills at end of test.
+    Provides a well-known working directory for each test.
+    """
+
+    # FIXME aconway 2009-11-05: too many env vars, need a simpler
+    # scheme for locating exes and libs
+
+    cluster_lib = os.getenv("CLUSTER_LIB")
+    xml_lib = os.getenv("XML_LIB")
+    qpidConfig_exec = os.getenv("QPID_CONFIG_EXEC")
+    qpidRoute_exec = os.getenv("QPID_ROUTE_EXEC")
+    receiver_exec = os.getenv("RECEIVER_EXEC")
+    sender_exec = os.getenv("SENDER_EXEC")
+
+    def setUp(self):
+        self.dir = os.path.join("brokertest.tmp", self.id())
+        if os.path.exists(self.dir):
+            rmtree(self.dir)             
+        os.makedirs(self.dir)
+        self.popens = []
+
+    def tearDown(self):
+        err = []
+        for p in self.popens:
+            try: p.testend()
+            except Unexpected, e: err.append(str(e))
+        if err: raise Exception("\n".join(err))
+
+    # FIXME aconway 2009-11-06: check for core files of exited processes.
+    
+    def cleanup_popen(self, popen):
+        """Add process to be killed at end of test"""
+        self.popens.append(popen)
+
+    def popen(self, cmd, expect=EXPECT_EXIT_OK):
+        """Start a process that will be killed at end of test"""
+        p = Popen(cmd, expect)
+        self.cleanup_popen(p)
+        return p
+
+    def broker(self, args=[], name=None, expect=EXPECT_RUNNING):
+        """Create and return a broker ready for use"""
+        b = Broker(self, args, name, expect)
+        b.connect().close()
+        return b
+
+    def cluster(self, count=0, args=[], expect=EXPECT_RUNNING):
+        """Create and return a cluster ready for use"""
+        cluster = Cluster(self, count, args, expect=expect)
+        cluster.wait()
+        return cluster
+
+class StoppableThread(Thread):
+    """
+    Base class for threads that do something in a loop and periodically check
+    to see if they have been stopped.
+    """
+    def __init__(self):
+        self.stopped = False
+        self.error = None
+        Thread.__init__(self)
+
+    def stop(self):
+        self.stopped = True
+        self.join()
+        if self.error: raise self.error
+    
+class Sender(StoppableThread):
+    """
+    Thread to run a sender client and send numbered messages until stopped.
+    """
+
+    def __init__(self, broker):
+        StoppableThread.__init__(self)
+        self.sender = broker.test.popen(
+            [broker.test.sender_exec, "--port", broker.port], 
expect=EXPECT_RUNNING)
+
+    def run(self):
+        try:
+            self.sent = 0
+            while not self.stopped:
+                self.sender.stdin.write(str(self.sent)+"\n")
+                self.sender.stdin.flush()
+                self.sent += 1
+        except Exception, e: self.error = e
+
+class Receiver(Thread):
+    """
+    Thread to run a receiver client and verify it receives
+    sequentially numbered messages.
+    """
+    def __init__(self, broker):
+        Thread.__init__(self)
+        self.test = broker.test
+        self.receiver = self.test.popen(
+            [self.test.receiver_exec, "--port", broker.port], 
expect=EXPECT_RUNNING)
+        self.stopat = None
+        self.lock = Lock()
+        self.error = None
+
+    def run(self):
+        try:
+            self.received = 0
+            while self.stopat is None or self.received < self.stopat:
+                self.lock.acquire()
+                try:
+                    self.test.assertEqual(self.receiver.stdout.readline(), 
str(self.received)+"\n")
+                    self.received += 1
+                finally:
+                    self.lock.release()
+        except Exception, e:
+            self.error = e
+
+    def stop(self, count):
+        """Returns when received >= count"""
+        self.lock.acquire()
+        self.stopat = count
+        self.lock.release()
+        self.join()
+        if self.error: raise self.error
+

Added: qpid/trunk/qpid/java/testkit/testkit.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/testkit/testkit.py?rev=834754&view=auto
==============================================================================
--- qpid/trunk/qpid/java/testkit/testkit.py (added)
+++ qpid/trunk/qpid/java/testkit/testkit.py Wed Nov 11 03:28:39 2009
@@ -0,0 +1,215 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time, string
+from brokertest import *
+from qpid.messaging import *
+
+
+try:
+    import java.lang.System
+    _cp = java.lang.System.getProperty("java.class.path"); 
+except ImportError: 
+    _cp = checkenv("QP_CP")
+
+# The base test case has support for launching the genric
+# receiver and sender through the TestLauncher with all the options.
+# 
+class JavaClientTest(BrokerTest):
+    """Base Case for Java Test cases"""
+
+    client_class = "org.apache.qpid.testkit.TestLauncher" 
+
+    # currently there is no transparent reconnection.
+    # temp hack: just creating the queue here and closing it.
+    def start_error_watcher(self,broker=None):
+        ssn = broker.connect().session()
+        err_watcher = ssn.receiver("control {create:always}", capacity=1)
+        ssn.close()  
+
+    def client(self,**options):
+        cmd =  ["java","-cp",_cp] 
+        cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")]
+        cmd += ["-Dhost=" + options.get("host","127.0.0.1")]
+        cmd += ["-Dport=" + str(options.get("port",5672))]
+        cmd += ["-Dcon_count=" + str(options.get("con_count",1))]
+        cmd += ["-Dssn_count=" + str(options.get("ssn_count",1))]
+        cmd += ["-Dqueue_name=" + options.get("queue_name","queue")]
+        cmd += ["-Dexchange_name=" + options.get("exchange_name","amq.direct")]
+        cmd += ["-Drouting_key=" + options.get("routing_key","routing_key")]
+        cmd += ["-Dunique_dests=" + str(options.get("unique_dests",True))]
+        cmd += ["-Ddurable=" + str(options.get("durable",False))]
+        cmd += ["-Dtransacted=" + str(options.get("transacted",False))]
+        cmd += ["-Dreceiver=" + str(options.get("receiver",False))]
+        cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))]
+        cmd += ["-Dsender=" + str(options.get("sender",False))]
+        cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))]
+        cmd += ["-Dtx_size=" + str(options.get("tx_size",10))]
+        cmd += ["-Dmsg_count=" + str(options.get("msg_count",10))]
+        cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))]
+        cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")]
+        cmd += ["-Dreliability=" + options.get("reliability", "exactly_once")] 
       
+        cmd += [self.client_class]
+
+        print str(options.get("port",5672))  
+        return cmd
+
+    # currently there is no transparent reconnection.
+    # temp hack: just creating a receiver and closing session soon after.
+    def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60):
+        ssn = broker.connect().session()
+        err_watcher = ssn.receiver("control {create:always}", capacity=1)
+        i = run_time/error_ck_freq
+        for j in range(i):            
+            try:   
+                m = err_watcher.fetch(timeout=error_ck_freq)
+                print self.check_for_error() 
+            except messaging.Empty, e:                
+                pass # do nothing
+        ssn.close()
+
+    def check_for_error(msg):
+        raise Exception("Error:%s \ntime:%s\ntrace:%s\n" %
+                         (msg.properties["desc"],
+                          msg.properties["time"],
+                          msg.properties["exception-trace"]
+                          ))
+
+    def terminate_and_capture_logs(self,popen, process_name):
+        popen.terminate()
+        log = os.path.join(self.dir, process_name+".out") 
+        f = open(log, 'w')
+        f.write(popen.stdout.read())
+        f.close() 
+
+        log = os.path.join(self.dir, process_name+".err") 
+        f = open(log, 'w')
+        f.write(popen.stderr.read())
+        f.close()
+
+    def verify(self, receiver,sender):
+        sender_running = receiver.is_running()
+        receiver_running = sender.is_running()
+
+        self.terminate_and_capture_logs(receiver,"receiver")
+        self.terminate_and_capture_logs(sender,"sender") 
+
+        self.assertTrue(receiver_running,"Receiver has exited prematually")
+        self.assertTrue(sender_running,"Sender has exited prematually")
+
+
+class ConcurrencyTest(JavaClientTest):
+    """A concurrency test suite for the JMS client"""
+
+    def test_multiplexing_con(self):
+        """Tests multiple sessions on a single connection""" 
+
+        cluster = Cluster(self, 2)
+        p = cluster[0].port
+     
+        self.start_error_watcher(broker=cluster[0])
+
+        receiver = self.popen(self.client(receiver=True,
+                                          ssn_count=25,
+                                          port=p,
+                                          test_name=self.id()),
+                              expect=EXPECT_EXIT_FAIL) 
+
+        sender = self.popen(self.client(sender=True,
+                                        ssn_count=25,
+                                        port=p,
+                                        test_name=self.id()),
+                              expect=EXPECT_EXIT_FAIL) 
+
+        self.monitor_clients(broker=cluster[0],run_time=60)
+        self.verify(receiver,sender)
+
+
+    def test_multiplexing_con_tx(self):
+        """Tests multiple transacted sessions on a single connection""" 
+
+        cluster = Cluster(self, 2)
+        ssn = cluster[0].connect().session()
+        p = cluster[0].port
+     
+        self.start_error_watcher(broker=cluster[0])
+
+        receiver = self.popen(self.client(receiver=True,
+                                          ssn_count=25,
+                                          port=p,
+                                          transacted=True,
+                                          test_name=self.id()),
+                              expect=EXPECT_EXIT_FAIL) 
+
+        sender = self.popen(self.client(sender=True,
+                                        ssn_count=25,
+                                        port=p,
+                                        transacted=True,
+                                        test_name=self.id()),
+                              expect=EXPECT_EXIT_FAIL) 
+
+        self.monitor_clients(broker=cluster[0],run_time=60)
+        ssn.close(); 
+        self.verify(receiver,sender)
+
+class SoakTest(JavaClientTest):
+    """A soak test suite for the JMS client"""
+
+    def test_failover(self):
+        cluster = self.cluster(4, expect=EXPECT_EXIT_FAIL)
+        p = cluster[0].port
+        self.start_error_watcher(broker=cluster[0])
+        receiver = self.popen(self.client(receiver=True,
+                                          ssn_count=1,
+                                          port=p,
+                                          reliability="at_least_once",
+                                          test_name=self.id()),
+                              expect=EXPECT_EXIT_FAIL) 
+
+        sender = self.popen(self.client(sender=True,
+                                        ssn_count=1,
+                                        port=p,
+                                        reliability="at_least_once",
+                                        test_name=self.id()),
+                              expect=EXPECT_EXIT_FAIL) 
+      
+        # grace period for java clients to get the failover properly setup.
+        time.sleep(30) 
+        error_msg=None
+        # Kill original brokers, start new ones.
+        try:
+            for i in range(4):
+                cluster[i].kill()
+                b=cluster.start()
+                self.monitor_clients(broker=b,run_time=30,error_ck_freq=30)
+        except ConnectError, e1:
+            error_msg = "Unable to connect to new cluster node"
+        except SessionError, e2:
+            error_msg = "Session error while connected to new cluster node"    
    
+
+        # verify also captures out/err streams
+        self.verify(receiver,sender)
+        if error_msg:      
+            raise Exception(error_msg)            
+  
+if __name__ == '__main__':
+    if not test.main(): sys.exit(1)
+        

Propchange: qpid/trunk/qpid/java/testkit/testkit.py
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to