Author: kwall
Date: Wed Nov  7 12:27:56 2012
New Revision: 1406584

URL: http://svn.apache.org/viewvc?rev=1406584&view=rev
Log:
QPID-4422: Python Client (0-8..0-9) now allows "instance" client property to be 
passed in order to allow re-subscribing to durable subscriptions. Centralised 
the creation of client properties such that this is only done in one place 
across all protocols. Also increased Python Client (0-8..0-9)'s diagnostic 
logging.

Added:
    qpid/trunk/qpid/python/qpid/tests/util.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/messageheader.py
Modified:
    qpid/trunk/qpid/python/qpid/client.py
    qpid/trunk/qpid/python/qpid/connection08.py
    qpid/trunk/qpid/python/qpid/delegates.py
    qpid/trunk/qpid/python/qpid/messaging/driver.py
    qpid/trunk/qpid/python/qpid/testlib.py
    qpid/trunk/qpid/python/qpid/tests/__init__.py
    qpid/trunk/qpid/python/qpid/util.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py
    qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py

Modified: qpid/trunk/qpid/python/qpid/client.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/client.py?rev=1406584&r1=1406583&r2=1406584&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/client.py (original)
+++ qpid/trunk/qpid/python/qpid/client.py Wed Nov  7 12:27:56 2012
@@ -18,13 +18,14 @@
 #
 
 """
-An AQMP client implementation that uses a custom delegate for
+An AMQP client implementation that uses a custom delegate for
 interacting with the server.
 """
 
 import os, threading
 from peer import Peer, Channel, Closed
 from delegate import Delegate
+from util import get_client_properties_with_defaults
 from connection08 import Connection, Frame, connect
 from spec08 import load
 from queue import Queue
@@ -76,12 +77,12 @@ class Client:
       self.lock.release()
     return q
 
-  def start(self, response, mechanism="AMQPLAIN", locale="en_US", 
tune_params=None):
+  def start(self, response, mechanism="AMQPLAIN", locale="en_US", 
tune_params=None, client_properties=None):
     self.mechanism = mechanism
     self.response = response
     self.locale = locale
     self.tune_params = tune_params
-
+    
self.client_properties=get_client_properties_with_defaults(provided_client_properties=client_properties)
     self.socket = connect(self.host, self.port)
     self.conn = Connection(self.socket, self.spec)
     self.peer = Peer(self.conn, ClientDelegate(self), Session)
@@ -128,7 +129,8 @@ class ClientDelegate(Delegate):
   def connection_start(self, ch, msg):
     msg.start_ok(mechanism=self.client.mechanism,
                  response=self.client.response,
-                 locale=self.client.locale)
+                 locale=self.client.locale,
+                 client_properties=self.client.client_properties)
 
   def connection_tune(self, ch, msg):
     if self.client.tune_params:

Modified: qpid/trunk/qpid/python/qpid/connection08.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/connection08.py?rev=1406584&r1=1406583&r2=1406584&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/connection08.py (original)
+++ qpid/trunk/qpid/python/qpid/connection08.py Wed Nov  7 12:27:56 2012
@@ -28,6 +28,9 @@ from cStringIO import StringIO
 from codec import EOF
 from compat import SHUT_RDWR
 from exceptions import VersionError
+from logging import getLogger, DEBUG
+
+log = getLogger("qpid.connection08")
 
 class SockIO:
 
@@ -35,7 +38,8 @@ class SockIO:
     self.sock = sock
 
   def write(self, buf):
-#    print "OUT: %r" % buf
+    if log.isEnabledFor(DEBUG):
+      log.debug("OUT: %r", buf)
     self.sock.sendall(buf)
 
   def read(self, n):
@@ -47,8 +51,9 @@ class SockIO:
         break
       if len(s) == 0:
         break
-#      print "IN: %r" % s
       data += s
+    if log.isEnabledFor(DEBUG):
+      log.debug("IN: %r", data)
     return data
 
   def flush(self):
@@ -120,19 +125,25 @@ class Connection:
                            (self.spec.major, self.spec.minor, major, minor))
       else:
         raise FramingError("unknown frame type: %s" % tid)
-    channel = c.decode_short()
-    body = c.decode_longstr()
-    dec = codec.Codec(StringIO(body), self.spec)
-    frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
-    frame.channel = channel
-    end = c.decode_octet()
-    if end != self.FRAME_END:
-      garbage = ""
-      while end != self.FRAME_END:
-        garbage += chr(end)
-        end = c.decode_octet()
-      raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
-    return frame
+    try:
+      channel = c.decode_short()
+      body = c.decode_longstr()
+      dec = codec.Codec(StringIO(body), self.spec)
+      frame = Frame.DECODERS[type].decode(self.spec, dec, len(body))
+      frame.channel = channel
+      end = c.decode_octet()
+      if end != self.FRAME_END:
+       garbage = ""
+       while end != self.FRAME_END:
+         garbage += chr(end)
+         end = c.decode_octet()
+       raise "frame error: expected %r, got %r" % (self.FRAME_END, garbage)
+      return frame
+    except EOF:
+      # An EOF caught here can indicate an error decoding the frame,
+      # rather than that a disconnection occurred,so it's worth logging it.
+      log.exception("Error occurred when reading frame with tid %s" % tid)
+      raise
 
   def write_0_9(self, frame):
     self.write_8_0(frame)

Modified: qpid/trunk/qpid/python/qpid/delegates.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/delegates.py?rev=1406584&r1=1406583&r2=1406584&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/delegates.py (original)
+++ qpid/trunk/qpid/python/qpid/delegates.py Wed Nov  7 12:27:56 2012
@@ -18,7 +18,7 @@
 #
 
 import os, connection, session
-from util import notify
+from util import notify, get_client_properties_with_defaults
 from datatypes import RangedSet
 from exceptions import VersionError, Closed
 from logging import getLogger
@@ -137,24 +137,12 @@ class Server(Delegate):
 
 class Client(Delegate):
 
-  ppid = 0
-  try:
-    ppid = os.getppid()
-  except:
-    pass
-
-  PROPERTIES = {"product": "qpid python client",
-                "version": "development",
-                "platform": os.name,
-                "qpid.client_process": os.path.basename(sys.argv[0]),
-                "qpid.client_pid": os.getpid(),
-                "qpid.client_ppid": ppid}
-
   def __init__(self, connection, username=None, password=None,
                mechanism=None, heartbeat=None, **kwargs):
     Delegate.__init__(self, connection)
-    self.client_properties=Client.PROPERTIES.copy()
-    self.client_properties.update(kwargs.get("client_properties",{}))
+    provided_client_properties = kwargs.get("client_properties")
+    
self.client_properties=get_client_properties_with_defaults(provided_client_properties)
+
     ##
     ## self.acceptableMechanisms is the list of SASL mechanisms that the 
client is willing to
     ## use.  If it's None, then any mechanism is acceptable.

Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1406584&r1=1406583&r2=1406584&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Wed Nov  7 12:27:56 2012
@@ -31,7 +31,7 @@ from qpid.messaging.exceptions import *
 from qpid.messaging.message import get_codec, Disposition, Message
 from qpid.ops import *
 from qpid.selector import Selector
-from qpid.util import URL, default
+from qpid.util import URL, default,get_client_properties_with_defaults
 from qpid.validator import And, Context, List, Map, Types, Values
 from threading import Condition, Thread
 
@@ -90,20 +90,6 @@ SUBJECT_DEFAULTS = {
   "topic": "#"
   }
 
-# XXX
-ppid = 0
-try:
-  ppid = os.getppid()
-except:
-  pass
-
-CLIENT_PROPERTIES = {"product": "qpid python client",
-                     "version": "development",
-                     "platform": os.name,
-                     "qpid.client_process": os.path.basename(sys.argv[0]),
-                     "qpid.client_pid": os.getpid(),
-                     "qpid.client_ppid": ppid}
-
 def noop(): pass
 def sync_noop(): pass
 
@@ -710,8 +696,7 @@ class Engine:
     except sasl.SASLError, e:
       raise AuthenticationFailure(text=str(e))
 
-    client_properties = CLIENT_PROPERTIES.copy()
-    client_properties.update(self.connection.client_properties)
+    client_properties = 
get_client_properties_with_defaults(provided_client_properties=self.connection.client_properties);
     self.write_op(ConnectionStartOk(client_properties=client_properties,
                                     mechanism=mech, response=initial))
 

Modified: qpid/trunk/qpid/python/qpid/testlib.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/testlib.py?rev=1406584&r1=1406583&r2=1406584&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ qpid/trunk/qpid/python/qpid/testlib.py Wed Nov  7 12:27:56 2012
@@ -73,7 +73,7 @@ class TestBase(unittest.TestCase):
         else:
             self.client.close()
 
-    def connect(self, host=None, port=None, user=None, password=None, 
tune_params=None):
+    def connect(self, host=None, port=None, user=None, password=None, 
tune_params=None, client_properties=None):
         """Create a new connction, return the Client object"""
         host = host or self.config.broker.host
         port = port or self.config.broker.port or 5672
@@ -82,9 +82,9 @@ class TestBase(unittest.TestCase):
         client = qpid.client.Client(host, port)
         try:
             if client.spec.major == 8 and client.spec.minor == 0:
-                client.start({"LOGIN": user, "PASSWORD": password}, 
tune_params=tune_params)
+                client.start({"LOGIN": user, "PASSWORD": password}, 
tune_params=tune_params, client_properties=client_properties)
             else:
-                client.start("\x00" + user + "\x00" + password, 
mechanism="PLAIN", tune_params=tune_params)
+                client.start("\x00" + user + "\x00" + password, 
mechanism="PLAIN", tune_params=tune_params, client_properties=client_properties)
         except qpid.client.Closed, e:
             if isinstance(e.args[0], VersionError):
                 raise Skipped(e.args[0])

Modified: qpid/trunk/qpid/python/qpid/tests/__init__.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/__init__.py?rev=1406584&r1=1406583&r2=1406584&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/__init__.py (original)
+++ qpid/trunk/qpid/python/qpid/tests/__init__.py Wed Nov  7 12:27:56 2012
@@ -37,6 +37,7 @@ import qpid.tests.datatypes
 import qpid.tests.connection
 import qpid.tests.spec010
 import qpid.tests.codec010
+import qpid.tests.util
 
 class TestTestsXXX(Test):
 

Added: qpid/trunk/qpid/python/qpid/tests/util.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/tests/util.py?rev=1406584&view=auto
==============================================================================
--- qpid/trunk/qpid/python/qpid/tests/util.py (added)
+++ qpid/trunk/qpid/python/qpid/tests/util.py Wed Nov  7 12:27:56 2012
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from unittest import TestCase
+from qpid.util import get_client_properties_with_defaults
+
+class UtilTest (TestCase):
+
+  def test_get_spec_recommended_client_properties(self):
+    client_properties = 
get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"})
+    self.assertTrue("product" in client_properties)
+    self.assertTrue("version" in client_properties)
+    self.assertTrue("platform" in client_properties)
+
+  def test_get_client_properties_with_provided_value(self):
+    client_properties = 
get_client_properties_with_defaults(provided_client_properties={"mykey":"myvalue"})
+    self.assertTrue("product" in client_properties)
+    self.assertTrue("mykey" in client_properties)
+    self.assertEqual("myvalue", client_properties["mykey"])
+
+  def test_get_client_properties_with_no_provided_values(self):
+    client_properties = 
get_client_properties_with_defaults(provided_client_properties=None)
+    self.assertTrue("product" in client_properties)
+
+    client_properties = get_client_properties_with_defaults()
+    self.assertTrue("product" in client_properties)
+
+  def 
test_get_client_properties_with_provided_value_that_overrides_default(self):
+    client_properties = 
get_client_properties_with_defaults(provided_client_properties={"version":"myversion"})
+    self.assertEqual("myversion", client_properties["version"])
+

Modified: qpid/trunk/qpid/python/qpid/util.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/util.py?rev=1406584&r1=1406583&r2=1406584&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/util.py (original)
+++ qpid/trunk/qpid/python/qpid/util.py Wed Nov  7 12:27:56 2012
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-import os, socket, time, textwrap, re
+import os, socket, time, textwrap, re, sys
 
 try:
   from ssl import wrap_socket as ssl
@@ -42,6 +42,24 @@ except ImportError:
     def close(self):
       self.sock.close()
 
+def get_client_properties_with_defaults(provided_client_properties={}):
+  ppid = 0
+  try:
+    ppid = os.getppid()
+  except:
+    pass
+
+  client_properties = {"product": "qpid python client",
+                       "version": "development",
+                       "platform": os.name,
+                       "qpid.client_process": os.path.basename(sys.argv[0]),
+                       "qpid.client_pid": os.getpid(),
+                       "qpid.client_ppid": ppid}
+
+  if provided_client_properties:
+    client_properties.update(provided_client_properties)
+  return client_properties
+
 def connect(host, port):
   for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
     af, socktype, proto, canonname, sa = res

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py?rev=1406584&r1=1406583&r2=1406584&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py Wed Nov  7 
12:27:56 2012
@@ -79,6 +79,51 @@ class BasicTests(TestBase):
         except Closed, e:
             self.assertChannelException(403, e.args[0])
 
+    def test_reconnect_to_durable_subscription(self):
+      try:
+        publisherchannel = self.channel
+       my_id = "my_id"
+        consumer_connection_properties_with_instance = {"instance": my_id}
+        queue_for_subscription = "queue_for_subscription_%s" % my_id
+        topic_name = "my_topic_name"
+        test_message = self.uniqueString()
+
+       durable_subscription_client = 
self.connect(client_properties=consumer_connection_properties_with_instance)
+        consumerchannel = durable_subscription_client.channel(1)
+        consumerchannel.channel_open()
+
+        
self._declare_and_bind_exclusive_queue_on_topic_exchange(consumerchannel, 
queue_for_subscription, topic_name)
+
+       # disconnect
+       durable_subscription_client.close()
+
+       # send message to topic
+        publisherchannel.basic_publish(routing_key=topic_name, 
exchange="amq.topic", content=Content(test_message))
+
+        # reconnect and consume message
+       durable_subscription_client = 
self.connect(client_properties=consumer_connection_properties_with_instance)
+        consumerchannel = durable_subscription_client.channel(1)
+        consumerchannel.channel_open()
+
+        
self._declare_and_bind_exclusive_queue_on_topic_exchange(consumerchannel, 
queue_for_subscription, topic_name)
+
+        # Create consumer and consume the message that was sent whilst 
subscriber was disconnected.  By convention we
+        # declare the consumer as exclusive to forbid concurrent access.
+        subscription = 
consumerchannel.basic_consume(queue=queue_for_subscription, exclusive=True)
+        queue = durable_subscription_client.queue(subscription.consumer_tag)
+
+       # consume and verify message content
+        msg = queue.get(timeout=1)
+        self.assertEqual(test_message, msg.content.body)
+        consumerchannel.basic_ack(delivery_tag=msg.delivery_tag)
+      finally:
+        publisherchannel.queue_delete(queue=queue_for_subscription)
+       durable_subscription_client.close()
+
+    def _declare_and_bind_exclusive_queue_on_topic_exchange(self, channel, 
queue, topic_name):
+        channel.queue_declare(queue=queue, exclusive=True, auto_delete=False, 
durable=True)
+        channel.queue_bind(exchange="amq.topic", queue=queue, 
routing_key=topic_name)
+
     def test_consume_queue_errors(self):
         """
         Test error conditions associated with the queue field of the consume 
method:

Modified: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py?rev=1406584&r1=1406583&r2=1406584&view=diff
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py (original)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py Wed Nov  7 
12:27:56 2012
@@ -19,4 +19,4 @@
 # under the License.
 #
 
-import query, queue
+import query, queue, messageheader

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/messageheader.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/messageheader.py?rev=1406584&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/messageheader.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/messageheader.py Wed Nov 
 7 12:27:56 2012
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.testlib import TestBase
+
+class MessageHeaderTests(TestBase):
+    """Verify that messages with headers work as expected"""
+
+    def test_message_with_integer_header(self):
+        props={"headers":{"one":1, "zero":0}}
+        self.queue_declare(queue="q")
+        q = self.consume("q")
+        self.assertPublishGet(q, routing_key="q", properties=props)
+
+    def test_message_with_string_header(self):
+        props={"headers":{"mystr":"hello world", "myempty":""}}
+        self.queue_declare(queue="q")
+        q = self.consume("q")
+        self.assertPublishGet(q, routing_key="q", properties=props)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to