Author: tross
Date: Thu Jan  7 20:32:09 2010
New Revision: 897007

URL: http://svn.apache.org/viewvc?rev=897007&view=rev
Log:
QPID-2327 - Enhance qpid-config to deal with xml and headers brokers
Committed patch from John Dunning

Added:
    qpid/trunk/qpid/cpp/src/tests/test.xquery
Modified:
    qpid/trunk/qpid/cpp/src/tests/cli_tests.py
    qpid/trunk/qpid/cpp/src/tests/run_cli_tests
    qpid/trunk/qpid/python/commands/qpid-config

Modified: qpid/trunk/qpid/cpp/src/tests/cli_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cli_tests.py?rev=897007&r1=897006&r2=897007&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cli_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cli_tests.py Thu Jan  7 20:32:09 2010
@@ -99,6 +99,116 @@
                 found = True
         self.assertEqual(found, False)
 
+        # helpers for some of the test methods
+    def helper_find_exchange(self, xchgname, typ, expected=True):
+        xchgs = self.qmf.getObjects(_class = "exchange")
+        found = False
+        for xchg in xchgs:
+            if xchg.name == xchgname:
+                if typ:
+                    self.assertEqual(xchg.type, typ)
+                found = True
+        self.assertEqual(found, expected)
+
+    def helper_create_exchange(self, xchgname, typ="direct", opts=""):
+        foo = self.command(opts + " add exchange " + typ + " " + xchgname)
+        # print foo
+        ret = os.system(foo)
+        self.assertEqual(ret, 0)
+        self.helper_find_exchange(xchgname, typ, True)
+
+    def helper_destroy_exchange(self, xchgname):
+        foo = self.command(" del exchange " + xchgname)
+        # print foo
+        ret = os.system(foo)
+        self.assertEqual(ret, 0)
+        self.helper_find_exchange(xchgname, False, expected=False)
+
+    def helper_find_queue(self, qname, expected=True):
+        queues = self.qmf.getObjects(_class="queue")
+        found = False
+        for queue in queues:
+            if queue.name == qname:
+                self.assertEqual(queue.durable, False)
+                found = True
+        self.assertEqual(found, expected)
+
+    def helper_create_queue(self, qname):
+        foo = self.command(" add queue " + qname)
+        # print foo
+        ret = os.system(foo)
+        self.assertEqual(ret, 0)
+        self.helper_find_queue(qname, True)
+
+    def helper_destroy_queue(self, qname):
+        foo = self.command(" del queue " + qname)
+        # print foo
+        ret = os.system(foo)
+        self.assertEqual(ret, 0)
+        self.helper_find_queue(qname, False)
+
+
+        # test the bind-queue-to-header-exchange functionality
+    def test_qpid_config_headers(self):
+        self.startQmf();
+        qmf = self.qmf
+        qname = "test_qpid_config"
+        xchgname = "test_xchg"
+
+        # first create a header xchg
+        self.helper_create_exchange(xchgname, typ="headers")
+
+        # create the queue
+        self.helper_create_queue(qname)
+
+        # now bind the queue to the xchg
+        foo = self.command(" bind " + xchgname + " " + qname + 
+                                     " key all foo=bar baz=quux")
+        # print foo
+        ret = os.system(foo)
+        self.assertEqual(ret, 0)
+
+        # he likes it, mikey.  Ok, now tear it all down.  first the binding
+        ret = os.system(self.command(" unbind " + xchgname + " " + qname +
+                                     " key"))
+        self.assertEqual(ret, 0)
+
+        # then the queue 
+        self.helper_destroy_queue(qname)
+
+        # then the exchange
+        self.helper_destroy_exchange(xchgname)
+
+        # test the bind-queue-xml-filter functionality
+    def test_qpid_config_xml(self):
+        self.startQmf();
+        qmf = self.qmf
+        qname = "test_qpid_config"
+        xchgname = "test_xchg"
+
+        # first create a header xchg
+        self.helper_create_exchange(xchgname, typ="xml")
+
+        # create the queue
+        self.helper_create_queue(qname)
+
+        # now bind the queue to the xchg
+        foo = self.command("-f test.xquery bind " + xchgname + " " + qname)
+        # print foo
+        ret = os.system(foo)
+        self.assertEqual(ret, 0)
+
+        # he likes it, mikey.  Ok, now tear it all down.  first the binding
+        ret = os.system(self.command(" unbind " + xchgname + " " + qname +
+                                     " key"))
+        self.assertEqual(ret, 0)
+
+        # then the queue 
+        self.helper_destroy_queue(qname)
+
+        # then the exchange
+        self.helper_destroy_exchange(xchgname)
+
     def test_qpid_config_durable(self):
         self.startQmf();
         qmf = self.qmf

Modified: qpid/trunk/qpid/cpp/src/tests/run_cli_tests
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_cli_tests?rev=897007&r1=897006&r2=897007&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_cli_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_cli_tests Thu Jan  7 20:32:09 2010
@@ -26,10 +26,39 @@
 
 trap stop_brokers INT TERM QUIT
 
+# helper function to create test.xquery in the current directory, so
+# that the python test program can find it.  yes, it leaves a turd.
+create_test_xquery() {
+     cat <<EOF > ./test.xquery
+    let \$w := ./weather
+    return \$w/station = 'Raleigh-Durham International Airport (KRDU)'
+       and \$w/temperature_f > 50
+       and \$w/temperature_f - \$w/dewpoint > 5
+       and \$w/wind_speed_mph > 7
+       and \$w/wind_speed_mph < 20
+EOF
+}
+
 start_brokers() {
-    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > 
qpidd.port
+    # if the xml lib is present, use it.  if not, disable any tests which
+    # look like they're xml related.
+    # if we start supporting xml on windows, it will need something similar 
+    # here
+    if [ -f ../.libs/xml.so ] ; then
+       xargs="--load-module ../.libs/xml.so"
+       if [ ! -f test.xquery ] ; then 
+           create_test_xquery
+       fi
+       targs=""
+    else
+       echo "Ignoring XML tests"
+       xargs=""
+       targs="--ignore=*xml*"
+    fi
+
+    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no $xargs 
> qpidd.port
     LOCAL_PORT=`cat qpidd.port`
-    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no > 
qpidd.port
+    ../qpidd --daemon --port 0 --no-data-dir --no-module-dir --auth no $xargs 
> qpidd.port
     REMOTE_PORT=`cat qpidd.port`
 }
 
@@ -41,7 +70,7 @@
 if test -d ${PYTHON_DIR} ;  then
     start_brokers
     echo "Running CLI tests using brokers on ports $LOCAL_PORT $REMOTE_PORT"
-    $PYTHON_COMMANDS/qpid-python-test -m cli_tests -b localhost:$LOCAL_PORT 
-Dremote-port=$REMOTE_PORT -Dcli-dir=$CLI_DIR $@
+    $PYTHON_COMMANDS/qpid-python-test -m cli_tests -b localhost:$LOCAL_PORT 
-Dremote-port=$REMOTE_PORT -Dcli-dir=$CLI_DIR $targs $@
     RETCODE=$?
     stop_brokers
     if test x$RETCODE != x0; then 

Added: qpid/trunk/qpid/cpp/src/tests/test.xquery
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/test.xquery?rev=897007&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/test.xquery (added)
+++ qpid/trunk/qpid/cpp/src/tests/test.xquery Thu Jan  7 20:32:09 2010
@@ -0,0 +1,6 @@
+   let $w := ./weather
+   return $w/station = 'Raleigh-Durham International Airport (KRDU)'
+      and $w/temperature_f > 50
+      and $w/temperature_f - $w/dewpoint > 5
+      and $w/wind_speed_mph > 7
+      and $w/wind_speed_mph < 20

Modified: qpid/trunk/qpid/python/commands/qpid-config
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-config?rev=897007&r1=897006&r2=897007&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-config (original)
+++ qpid/trunk/qpid/python/commands/qpid-config Thu Jan  7 20:32:09 2010
@@ -24,6 +24,7 @@
 import sys
 import locale
 from qmf.console import Session
+import io
 
 _recursive         = False
 _host              = "localhost"
@@ -43,6 +44,7 @@
 _msgSequence       = False
 _ive               = False
 _eventGeneration   = None
+_file              = None
 
 FILECOUNT = "qpid.file_count"
 FILESIZE  = "qpid.file_size"
@@ -65,6 +67,8 @@
     print "        qpid-config [OPTIONS] add queue <name> [AddQueueOptions]"
     print "        qpid-config [OPTIONS] del queue <name> [DelQueueOptions]"
     print "        qpid-config [OPTIONS] bind   <exchange-name> <queue-name> 
[binding-key]"
+    print "                  <for type xml>     [-f -|filename]"
+    print "                  <for type header>  [all|any] k1=v1 [, k2=v2...]"
     print "        qpid-config [OPTIONS] unbind <exchange-name> <queue-name> 
[binding-key]"
     print
     print "Options:"
@@ -134,6 +138,46 @@
     print
     sys.exit (1)
 
+
+#
+# helpers for the arg parsing in bind().  return multiple values; "ok"
+# followed by the resultant args
+
+#
+# accept -f followed by either
+# a filename or "-", for stdin.  pull the bits into a string, to be 
+# passed to the xml binding.
+#
+def snarf_xquery_args():
+    if not _file:
+        print "Invalid args to bind xml:  need an input file or stdin"
+        return [False]
+    if _file == "-":
+        res = sys.stdin.read()
+    else:
+        f = io.open(_file)   # let this signal if it can't find it
+        res = f.read()
+        f.close()
+    return [True, res]
+    
+#
+# look for "any"/"all" and grok the rest of argv into a map
+#
+def snarf_header_args(cargs):
+    if len(cargs) < 2:
+        print "Invalid args to bind headers:  need 'any'/'all' plus conditions"
+        return [False]
+    op = cargs[0]
+    if op == "all" or op == "any":
+        kv = {}
+        for thing in cargs[1:]:
+            k_and_v = thing.split("=")
+            kv[k_and_v[0]] = k_and_v[1]
+        return [True, op, kv]
+    else:
+        print "Invalid condition arg to bind headers, need 'any' or 'all', not 
'" + op + "'"
+        return [False]
+
 class BrokerManager:
     def __init__ (self):
         self.brokerName = None
@@ -344,7 +388,36 @@
         key   = ""
         if len (args) > 2:
             key = args[2]
-        self.broker.getAmqpSession().exchange_bind (queue=qname, 
exchange=ename, binding_key=key)
+
+        # query the exchange to determine its type.
+        res = self.broker.getAmqpSession().exchange_query(ename)
+
+        # type of the xchg determines the processing of the rest of
+        # argv.  if it's an xml xchg, we want to find a file
+        # containing an x-query, and pass that.  if it's a headers
+        # exchange, we need to pass either "any" or all, followed by a
+        # map containing key/value pairs.  if neither of those, extra
+        # args are ignored.
+        ok = True
+        args = None
+        if res.type == "xml":
+            # this checks/imports the -f arg
+            [ok, xquery] = snarf_xquery_args()
+            args = { "xquery" : xquery }
+            # print args
+        else:
+            if res.type == "headers":
+                [ok, op, kv] = snarf_header_args(cargs[4:])
+                args = kv
+                args["x-match"] = op
+
+        if not ok:
+            sys.exit(1)
+
+        self.broker.getAmqpSession().exchange_bind (queue=qname,
+                                                    exchange=ename,
+                                                    binding_key=key,
+                                                    arguments=args)
 
     def Unbind (self, args):
         if len (args) < 2:
@@ -383,8 +456,8 @@
     longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=", 
"file-count=",
                 "file-size=", "max-queue-size=", "max-queue-count=", 
"limit-policy=",
                 "order=", "sequence", "ive", "generate-queue-events=", 
"force", "force-if-not-empty",
-                "force_if_used", "alternate-exchange=", "passive", "timeout=")
-    (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts)
+                "force_if_used", "alternate-exchange=", "passive", "timeout=", 
"file=")
+    (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:bf:", longOpts)
 except:
     Usage ()
 
@@ -399,6 +472,8 @@
         _recursive = True
     if opt[0] == "-a" or opt[0] == "--broker-addr":
         _host = opt[1]
+    if opt[0] == "-f" or opt[0] == "--file":
+        _file = opt[1]
     if opt[0] == "--timeout":
         _connTimeout = int(opt[1])
         if _connTimeout == 0:
@@ -488,6 +563,9 @@
             Usage ()
 except KeyboardInterrupt:
     print
+except IOError, e:
+    print e
+    sys.exit(1)
 except Exception,e:
     print "Failed: %s: %s" % (e.__class__.__name__, e)
     sys.exit(1)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to