Author: aconway
Date: Thu Dec 11 15:51:01 2014
New Revision: 1644684
URL: http://svn.apache.org/r1644684
Log:
DISPATCH-82: Poor error handling by qdmanage and qdstat tools.
Fixed error handling for qd tools as follows:
- Forked future proton reactor work into qpid_dispatch.internal.proton_future
- Added proton_future.util.SyncRequestResponse to implement request-response
pattern
- Updated qpid_dispatch.management.client to use SyncRequestResponse
When the reactor work has made it into a proton release, the proton_future
package
should be removed.
Added:
qpid/dispatch/trunk/python/qpid_dispatch_internal/proton_future/
qpid/dispatch/trunk/python/qpid_dispatch_internal/proton_future/__init__.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/proton_future/handlers.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/proton_future/reactors.py
qpid/dispatch/trunk/python/qpid_dispatch_internal/proton_future/utils.py
Modified:
qpid/dispatch/trunk/python/qpid_dispatch/management/client.py
qpid/dispatch/trunk/tests/system_test.py
qpid/dispatch/trunk/tests/system_tests_management.py
qpid/dispatch/trunk/tools/qdstat
Modified: qpid/dispatch/trunk/python/qpid_dispatch/management/client.py
URL:
http://svn.apache.org/viewvc/qpid/dispatch/trunk/python/qpid_dispatch/management/client.py?rev=1644684&r1=1644683&r2=1644684&view=diff
==============================================================================
--- qpid/dispatch/trunk/python/qpid_dispatch/management/client.py (original)
+++ qpid/dispatch/trunk/python/qpid_dispatch/management/client.py Thu Dec 11
15:51:01 2014
@@ -21,49 +21,12 @@
AMQP management client for Qpid dispatch.
"""
+import qpid_dispatch_site
import proton, threading
from proton import Url
from .error import *
from .entity import EntityBase, clean_dict
-
-class MessengerImpl(object):
- """
- Messaging implementation for L{Node} based on proton.Messenger
- @ivar reply_to: address for replies to the node.
- """
-
- def __init__(self, address, timeout=None):
- self.messenger = proton.Messenger()
- self.messenger.start()
- self.messenger.timeout = timeout
- subscribe_address = Url(address)
- subscribe_address.path = "#"
- self.subscription = self.messenger.subscribe(str(subscribe_address))
- self._flush()
- self.reply_to = self.subscription.address
- if not self.reply_to:
- raise ValueError("Failed to subscribe to %s"%subscribe_address)
-
- def send(self, request):
- """Send a message"""
- self.messenger.put(request)
- self.messenger.send()
- self._flush()
-
- def fetch(self):
- """Wait for a single message."""
- self.messenger.recv(1)
- response = proton.Message()
- self.messenger.get(response)
- return response
-
- def _flush(self):
- """Call self.messenger.work() till there is no work left."""
- while self.messenger.work(0.1): pass
-
- def stop(self):
- """Stop the messaging implementation"""
- self.messenger.stop()
+from qpid_dispatch_internal.proton_future.utils import SyncRequestResponse,
BlockingConnection
class Entity(EntityBase):
@@ -109,50 +72,36 @@ class Entity(EntityBase):
class Node(object):
"""Client proxy for an AMQP management node"""
- def __init__(self, address=None, router=None, locales=None, timeout=10,
message_impl=None):
+ def __init__(self, url=None, router=None, locales=None, timeout=10,
connection=None):
"""
- @param address: AMQP address of the management node.
+ @param url: URL of the management node.
@param router: If address does not contain a path, use the management
node for this router ID.
If not specified and address does not contain a path, use the
default management node.
@param locales: Default list of locales for management operations.
+ @param client: a L{BlockingConnection}
"""
self.name = self.identity = 'self'
self.type = 'org.amqp.management' # AMQP management node type
-
- self.address = Url(address).defaults()
+ self.url = Url(url).defaults()
self.locales = locales
- if self.address.path is None:
+ if self.url.path is None:
if router:
- self.address.path = '_topo/0/%s/$management' % router
+ self.url.path = '_topo/0/%s/$management' % router
else:
- self.address.path = '$management'
- self.responses = {}
- self.message_impl = message_impl or MessengerImpl(self.address,
timeout=timeout)
- self.reply_to = self.message_impl.reply_to
+ self.url.path = '$management'
+ connection=connection or BlockingConnection(url, timeout)
+ self.client = SyncRequestResponse(connection, self.url.path)
+ self.reply_to = self.client.reply_to
- def stop(self):
+ def close(self):
"""Shut down the node"""
- if not self.message_impl: return
- self.message_impl.stop()
- self.message_impl = None
-
- def __del__(self):
- if hasattr(self, 'message_impl'):
- try: self.stop()
- except: pass
+ if self.client:
+ self.client.close()
+ self.client = None
def __repr__(self):
return "%s(%s)"%(self.__class__.__name__, self.address)
- CORRELATION_ID = 0
- CORRELATION_LOCK = threading.Lock()
-
- def correlation_id(self):
- """Get the next correlation ID. Thread safe."""
- with self.CORRELATION_LOCK:
- Node.CORRELATION_ID += 1
- return Node.CORRELATION_ID
-
@staticmethod
def check_response(response, expect=OK):
"""
@@ -176,9 +125,6 @@ class Node(object):
"""
if self.locales: properties.setdefault('locales', self.locales)
request = proton.Message()
- request.address = str(self.address)
- request.reply_to = self.reply_to
- request.correlation_id = self.correlation_id()
request.properties = clean_dict(properties)
request.body = body or {}
return request
@@ -192,15 +138,7 @@ class Node(object):
Send a management request message, wait for a response.
@return: Response message.
"""
- if not request.address:
- raise ValueError("Message must have an address")
- if not request.reply_to:
- raise ValueError("Message must have reply_to %s", request)
- self.message_impl.send(request)
- while True:
- response = self.message_impl.fetch()
- # Ignore mismatched correlation IDs, responses to earlier requests
that timed out.
- if response.correlation_id == request.correlation_id: break
+ response = self.client.call(request)
self.check_response(response, expect=expect)
return response
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]