Author: tross
Date: Wed Dec 16 22:24:57 2009
New Revision: 891456

URL: http://svn.apache.org/viewvc?rev=891456&view=rev
Log:
QPID-2261 - Applied patch from Ken Giusti

Added:
    qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py
Modified:
    qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py
    qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py
    qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py
    qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py

Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py?rev=891456&r1=891455&r2=891456&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfAgent.py Wed Dec 16 22:24:57 2009
@@ -21,12 +21,12 @@
 import socket
 import os
 import logging
-from threading import Thread
+from threading import Thread, Lock
 from qpid.messaging import Connection, Message
 from uuid import uuid4
 from qmfCommon import (AMQP_QMF_TOPIC, AMQP_QMF_DIRECT, AMQP_QMF_AGENT_LOCATE, 
                        AMQP_QMF_AGENT_INDICATION, AgentId, QmfManaged, 
makeSubject,
-                       parseSubject, OpCode)
+                       parseSubject, OpCode, Query, SchemaObjectClass, 
_doQuery)
 
 
 
@@ -49,6 +49,10 @@
         self._address = str(self._id)
         self._notifier = notifier
         self._conn = None
+        self._lock = Lock()
+        self._data_schema = {}
+        self._event_schema = {}
+        self._agent_data = {}
 
     def getAgentId(self):
         return AgentId(self.vendor, self.product, self.name)
@@ -61,52 +65,66 @@
         self._ind_sender = self._session.sender(AMQP_QMF_AGENT_INDICATION)
         self._running = True
         self.start()
+    
+    def registerObjectClass(self, schema):
+        """
+        Register an instance of a SchemaObjectClass with this agent
+        """
+        # @todo: need to update subscriptions
+        # @todo: need to mark schema as "non-const"
+        if not isinstance(schema, SchemaObjectClass):
+            raise TypeError("SchemaObjectClass instance expected")
 
+        self._lock.acquire()
+        try:
+            self._data_schema[schema.getClassId()] = schema
+        finally:
+            self._lock.release()
 
-    def _dispatch(self, msg, _direct=False):
+
+    def registerEventClass(self, cls):
+        logging.error("!!!Agent.registerEventClass() TBD!!!")
+
+    def raiseEvent(self, qmfEvent):
+        logging.error("!!!Agent.raiseEvent() TBD!!!")
+
+    def addObject(self, data ):
         """
-        @param _direct: True if msg directly addressed to this agent.
+        Register an instance of a QmfAgentData object.
         """
+        # @todo: need to update subscriptions
+        # @todo: need to mark schema as "non-const"
+        if not isinstance(data, QmfAgentData):
+            raise TypeError("QmfAgentData instance expected")
+
+        self._lock.acquire()
         try:
-            version,opcode = parseSubject(msg.subject)
-        except:
-            logging.debug("Ignoring unrecognized message '%s'" % msg.subject)
-            return
+            self._agent_data[data.getObjectId()] = data
+        finally:
+            self._lock.release()
 
-        cmap = {}; props={}
-        if msg.content_type == "amqp/map":
-            cmap = msg.content
-        if msg.properties:
-            props = msg.properties
 
-        if opcode == OpCode.agent_locate:
-            reply = False
-            if "method" in props and props["method"] == "request":
-                if "query" in cmap:
-                    if self._doQuery(cmap["query"]):
-                        reply=True
-                else:
-                    reply=True
+    def methodResponse(self, context, status, text, arguments):
+        logging.error("!!!Agent.methodResponse() TBD!!!")
 
-            if reply:
-                try:
-                    tmp_snd = self._session.sender( msg.reply_to )
-                    m = Message( subject=makeSubject(OpCode.agent_locate),
-                                 properties={"method":"response"},
-                                 content={"name": {"vendor":"redhat.com",
-                                                   "product":"agent",
-                                                   "name":"tross"}},
-                                 correlation_id=msg.correlation_id)
-                    tmp_snd.send(m)
-                    logging.debug("reply-to [%s] sent" % msg.reply_to)
-                except e:
-                    logging.error("Failed to send reply to msg '%s'" % str(e))
-            else:
-                logging.debug("Ignoring invalid agent-locate msg")
-        else:
-            logging.warning("Ignoring message with unrecognized 'opcode' 
value: '%s'"
-                            % opcode)
+    def getWorkItemCount(self): 
+        """ 
+        Returns the count of pending WorkItems that can be retrieved.
+        """
+        logging.error("!!!Agent.getWorkItemCount() TBD!!!")
 
+    def getNextWorkItem(self, timeout=None): 
+        """
+        Obtains the next pending work item, or None if none available. 
+        """
+        logging.error("!!!Agent.getNextWorkItem() TBD!!!")
+
+    def releaseWorkItem(self, wi): 
+        """
+        Releases a WorkItem instance obtained by getNextWorkItem(). Called 
when 
+        the application has finished processing the WorkItem. 
+        """
+        logging.error("!!!Agent.releaseWorkItem() TBD!!!")
 
 
     def run(self):
@@ -114,7 +132,7 @@
         while self._running:
             try:
                 msg = self._locate_receiver.fetch(1)
-                logging.info("Agent Locate Rcvd: '%s'" % msg)
+                logging.debug("Agent Locate Rcvd: '%s'" % msg)
                 if msg.content_type == "amqp/map":
                     self._dispatch(msg, _direct=False)
             except KeyboardInterrupt:
@@ -124,7 +142,7 @@
 
             try:
                 msg = self._direct_receiver.fetch(1)
-                logging.info("Agent Msg Rcvd: '%s'" % msg)
+                logging.debug("Agent Msg Rcvd: '%s'" % msg)
                 if msg.content_type == "amqp/map":
                     self._dispatch(msg, _direct=True)
             except KeyboardInterrupt:
@@ -132,61 +150,100 @@
             except:
                 pass
 
+            # @todo: actually implement the periodic agent-ind
+            # message generation!
             count+= 1
             if count == 5:
                 count = 0
-                m = Message( subject=makeSubject(OpCode.agent_ind),
-                             properties={"method":"indication"},
-                             content={"name": {"vendor":"redhat.com",
-                                               "product":"agent",
-                                               "name":"tross"}} )
-                self._ind_sender.send(m)
-                logging.info("Agent Indication Sent")
+                self._ind_sender.send(self._makeAgentIndMsg())
+                logging.debug("Agent Indication Sent")
 
-    
-    def registerObjectClass(self, cls):
-        logging.error("!!!Agent.registerObjectClass() TBD!!!")
-
-    def registerEventClass(self, cls):
-        logging.error("!!!Agent.registerEventClass() TBD!!!")
+    #
+    # Private:
+    #
 
-    def raiseEvent(self, qmfEvent):
-        logging.error("!!!Agent.raiseEvent() TBD!!!")
+    def _makeAgentIndMsg(self):
+        """
+        Create an agent indication message identifying this agent
+        """
+        return Message( subject=makeSubject(OpCode.agent_ind),
+                        properties={"method":"response"},
+                        content={Query._TARGET_AGENT_ID: 
+                                 self.getAgentId().mapEncode()})
 
-    def addObject(self, qmfAgentData ):
-        logging.error("!!!Agent.addObject() TBD!!!")
 
-    def methodResponse(self, context, status, text, arguments):
-        logging.error("!!!Agent.methodResponse() TBD!!!")
 
-    def getWorkItemCount(self): 
-        """ 
-        Returns the count of pending WorkItems that can be retrieved.
+    def _dispatch(self, msg, _direct=False):
         """
-        logging.error("!!!Agent.getWorkItemCount() TBD!!!")
+        Process a message from a console.
 
-    def getNextWorkItem(self, timeout=None): 
-        """
-        Obtains the next pending work item, or None if none available. 
+        @param _direct: True if msg directly addressed to this agent.
         """
-        logging.error("!!!Agent.getNextWorkItem() TBD!!!")
+        logging.error( "Message received from Console! [%s]" % msg )
+        try:
+            version,opcode = parseSubject(msg.subject)
+        except:
+            logging.debug("Ignoring unrecognized message '%s'" % msg.subject)
+            return
 
-    def releaseWorkItem(self, wi): 
+        cmap = {}; props={}
+        if msg.content_type == "amqp/map":
+            cmap = msg.content
+        if msg.properties:
+            props = msg.properties
+
+        if opcode == OpCode.agent_locate:
+            self._handleAgentLocateMsg( msg, cmap, props, version, _direct )
+        elif opcode == OpCode.get_query:
+            logging.warning("!!! GET_QUERY TBD !!!")
+        elif opcode == OpCode.method_req:
+            logging.warning("!!! METHOD_REQ TBD !!!")
+        elif opcode == OpCode.cancel_subscription:
+            logging.warning("!!! CANCEL_SUB TBD !!!")
+        elif opcode == OpCode.create_subscription:
+            logging.warning("!!! CREATE_SUB TBD !!!")
+        elif opcode == OpCode.renew_subscription:
+            logging.warning("!!! RENEW_SUB TBD !!!")
+        elif opcode == OpCode.schema_query:
+            logging.warning("!!! SCHEMA_QUERY TBD !!!")
+        elif opcode == OpCode.noop:
+            logging.debug("No-op msg received.")
+        else:
+            logging.warning("Ignoring message with unrecognized 'opcode' 
value: '%s'"
+                            % opcode)
+
+    def _handleAgentLocateMsg( self, msg, cmap, props, version, direct ):
         """
-        Releases a WorkItem instance obtained by getNextWorkItem(). Called 
when 
-        the application has finished processing the WorkItem. 
+        Process a received agent-locate message
         """
-        logging.error("!!!Agent.releaseWorkItem() TBD!!!")
+        logging.debug("_handleAgentLocateMsg")
+
+        reply = True
+        if "method" in props and props["method"] == "request":
+            if "query" in cmap:
+                query = cmap["query"]
+                # is the query an agent locate?
+                if Query._TARGET in query and query[Query._TARGET] == 
{Query._TARGET_AGENT_ID:None}:
+                    if Query._PREDICATE in query:
+                        # does this agent match the predicate?
+                        reply = _doQuery( query[Query._PREDICATE], 
self.getAgentId().mapEncode() )
+                else:
+                    reply = False
+                    logging.debug("Ignoring query - not an agent-id query: 
'%s'" % query)
+                reply=True
+
+        if reply:
+            try:
+                tmp_snd = self._session.sender( msg.reply_to )
+                m = self._makeAgentIndMsg()
+                m.correlation_id = msg.correlation_id
+                tmp_snd.send(m)
+                logging.debug("agent-ind sent to [%s]" % msg.reply_to)
+            except:
+                logging.error("Failed to send reply to agent-ind msg '%s'" % 
msg)
+        else:
+            logging.debug("agent-locate msg not mine - no reply sent")
 
-    def _doQuery(self, query):
-        # query = cmap["query"]
-        # if ("vendor" in query and (query["vendor"] == "*" or query["vendor"] 
== self.vendor) and
-        #     "product" in query and (query["product"] == "*" or 
query["product"] == self.product) and
-        #     "name" in query and (query["name"] == "*" or query["name"] == 
self.name)):
-        #     logging.debug("Query received for %s:%s:%s" % (self.vendor, 
self.product, self.name))
-        #     logging.debug("reply-to [%s], cid=%s" % (msg.reply_to, 
msg.correlation_id))
-        logging.error("!!!Agent._doQuery() TBD!!!")
-        return True
 
 
   
##==============================================================================
@@ -228,7 +285,7 @@
 
 if __name__ == '__main__':
     import time
-    #logging.getLogger().setLevel(logging.INFO)
+    logging.getLogger().setLevel(logging.INFO)
     logging.info( "Starting Connection" )
     _c = Connection("localhost")
     _c.connect()

Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py?rev=891456&r1=891455&r2=891456&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfCommon.py Wed Dec 16 22:24:57 2009
@@ -48,9 +48,28 @@
 AMQP_QMF_SUBJECT_FMT = "%s%d.%s"
 
 class OpCode(object):
+    noop = "noop"
+
+    # codes sent by a console and processed by the agent
     agent_locate = "agent-locate"
+    cancel_subscription = "cancel-subscription"
+    create_subscription = "create-subscription"
+    get_query = "get_query"
+    method_req = "method"
+    renew_subscription = "renew-subscription"
+    schema_query = "schema-query"  # @todo: deprecate
+
+    # codes sent by the agent to a console
     agent_ind = "agent"
-    noop = "noop"
+    data_ind = "data"
+    event_ind = "event"
+    managed_object = "managed-object"
+    object_ind = "object"
+    response = "response"
+    schema_ind="schema"   # @todo: deprecate
+
+
+
 
 def makeSubject(_code): 
     """
@@ -69,6 +88,19 @@
     return _sub[3:].split('.', 1)
 
 
+class Notifier(object):
+    """
+    Virtual base class that defines a call back which alerts the application 
that
+    a QMF Console notification is pending.
+    """
+    def indication(self):
+        """
+        Called when one or more items are ready for the application to process.
+        This method may be called by an internal QMF library thread.  Its 
purpose is to
+        indicate that the application should process pending work items.
+        """
+        raise Exception("The indication method must be overridden by the 
application!")
+
 
 
 
##==============================================================================
@@ -966,7 +998,127 @@
 #   
##==============================================================================
 
 
+
+def _doQuery(predicate, params ):
+    """
+    Given the predicate from a query, and a map of named parameters, apply the 
predicate
+    to the parameters, and return True or False.
+    """
+    if type(predicate) != list or len(predicate) < 1:
+        return False
+
+    opr = predicate[0]
+    if opr == Query._CMP_TRUE:
+        logging.info("_doQuery() TRUE: [%s]" % predicate )
+        return True
+    elif opr == Query._CMP_FALSE:
+        logging.info("_doQuery() FALSE: [%s]" % predicate )
+        return False
+    elif opr == Query._LOGIC_AND:
+        logging.info("_doQuery() AND: [%s]" % predicate )
+        rc = False
+        for exp in predicate[1:]:
+            rc = _doQuery( exp, params )
+            if not rc:
+                break
+        return rc
+
+    elif opr == Query._LOGIC_OR:
+        logging.info("_doQuery() OR: [%s]" % predicate )
+        rc = False
+        for exp in predicate[1:]:
+            rc = _doQuery( exp, params )
+            if rc:
+                break
+        return rc
+
+    elif opr == Query._LOGIC_NOT:
+        logging.info("_doQuery() NOT: [%s]" % predicate )
+        if len(predicate) != 2:
+            logging.warning("Malformed query not-expression received: '%s'" % 
predicate)
+            return False
+        return not _doQuery( predicate[1:], params )
+
+    elif opr in [Query._CMP_EQ, Query._CMP_NE, Query._CMP_LT, 
+                 Query._CMP_LE, Query._CMP_GT, Query._CMP_GE,
+                 Query._CMP_RE]:
+        if len(predicate) != 3:
+            logging.warning("Malformed query compare expression received: 
'%s'" % predicate)
+            return False
+        # @todo: support regular expression match
+        name = predicate[1]
+        if name not in params:
+            logging.warning("Malformed query, attribute '%s' not present." % 
name)
+            return False
+        arg1 = params[name]
+        arg1Type = type(arg1)
+        logging.info("_doQuery() CMP: [%s] value='%s'" % (predicate, arg1) )
+        try:
+            arg2 = arg1Type(predicate[2])
+            if opr == Query._CMP_EQ: return arg1 == arg2
+            if opr == Query._CMP_NE: return arg1 != arg2
+            if opr == Query._CMP_LT: return arg1 < arg2
+            if opr == Query._CMP_LE: return arg1 <= arg2
+            if opr == Query._CMP_GT: return arg1 > arg2
+            if opr == Query._CMP_GE: return arg1 >= arg2
+            if opr == Query._CMP_RE: 
+                logging.error("!!! RE QUERY TBD !!!")
+                return False
+        except:
+            logging.warning("Malformed query, unable to compare '%s'" % 
predicate)
+        return False
+
+    elif opr == Query._CMP_PRESENT:
+        logging.info("_doQuery() PRESENT: [%s]" % predicate )
+        if len(predicate) != 2:
+            logging.warning("Malformed query present expression received: 
'%s'" % predicate)
+            return False
+        name = predicate[1]
+        return name in params
+
+    else:
+        logging.warning("Unknown query operator received: '%s'" % opr)
+    return False
+
+
+
 class Query:
+    _TARGET="what"
+    _PREDICATE="where"
+
+    _TARGET_PACKAGES="_packages"
+    _TARGET_OBJECT_ID="_object_id"
+    _TARGET_SCHEMA="_schema"
+    _TARGET_SCHEMA_ID="_schema_id"
+    _TARGET_MGT_DATA="_mgt_data"
+    _TARGET_AGENT_ID="_agent_id"
+
+    _PRED_PACKAGE="_package_name"
+    _PRED_CLASS="_class_name"
+    _PRED_TYPE="_type"
+    _PRED_HASH="_has_str"
+    _PRED_SCHEMA_ID="_schema_id"
+    _PRED_VENDOR="_vendor"
+    _PRED_PRODUCT="_product"
+    _PRED_NAME="_name"
+    _PRED_AGENT_ID="_agent_id"
+    _PRED_PRIMARY_KEY="_primary_key"
+
+    _CMP_EQ="eq"
+    _CMP_NE="ne"
+    _CMP_LT="lt"
+    _CMP_LE="le"
+    _CMP_GT="gt"
+    _CMP_GE="ge"
+    _CMP_RE="re_match"
+    _CMP_PRESENT="exists"
+    _CMP_TRUE="true"
+    _CMP_FALSE="false"
+
+    _LOGIC_AND="and"
+    _LOGIC_OR="or"
+    _LOGIC_NOT="not"
+
     def __init__(self, kwargs={}):
         pass
 #         if "impl" in kwargs:

Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py?rev=891456&r1=891455&r2=891456&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py Wed Dec 16 22:24:57 2009
@@ -30,7 +30,8 @@
 from qpid.messaging import *
 
 from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, 
AMQP_QMF_AGENT_INDICATION,
-                       AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, 
parseSubject, OpCode)
+                       AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, 
parseSubject, OpCode,
+                       Query, AgentIdFactory, Notifier, _doQuery)
 
 
 
@@ -67,9 +68,8 @@
     def fetch(self, timeout=None):
         self._cv.acquire()
         try:
-            if len(self._msgs):
-                return self._msgs.pop()
-            self._cv.wait(timeout)
+            if len(self._msgs) == 0:
+                self._cv.wait(timeout)
             if len(self._msgs):
                 return self._msgs.pop()
             return None
@@ -169,6 +169,19 @@
             self.lock.release()
 
 
+    def isValid(self, seq):
+        """
+        True if seq is in use, else False (seq is unknown)
+        """
+        seq = long(seq)
+        self.lock.acquire()
+        try:
+            return seq in self.pending
+        finally:
+            self.lock.release()
+        return False
+
+
 
 #class ObjectProxy(QmfObject):
 class ObjectProxy(object):
@@ -198,11 +211,11 @@
         """
         if not self._agent:
             raise Exception("No Agent associated with this object")
-        newer = self._agent.get_object(Query({"object_id":object_id}), timeout)
+        newer = self._agent.get_object(Query({"object_id":None}), timeout)
         if newer == None:
             logging.error("Failed to retrieve object %s from agent %s" % 
(str(self), str(self._agent)))
             raise Exception("Failed to retrieve object %s from agent %s" % 
(str(self), str(self._agent)))
-        self.mergeUpdate(newer)  ### ??? in Rafi's console.py::Object Class
+        #self.mergeUpdate(newer)  ### ??? in Rafi's console.py::Object Class
 
     ### def _merge_update(self, newerObject):
     ### ??? in Rafi's console.py::Object Class
@@ -216,32 +229,36 @@
 
 
 
-class AgentProxy(object):
+class Agent(object):
     """
     A local representation of a remote agent managed by this console.
     """
-    def __init__(self, name):
+    def __init__(self, agent_id, console):
         """
         @type name: AgentId
         @param name: uniquely identifies this agent in the AMQP domain.
         """
-        if not name or not isinstance(name, AgentId):
-            raise Exception( "Attempt to create an Agent without supplying a 
valid agent name." );
-
-        self._name = name
-        self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(name)
-        self._console = None
+        if not isinstance(agent_id, AgentId):
+            raise TypeError("parameter must be an instance of class AgentId")
+        if not isinstance(console, Console):
+            raise TypeError("parameter must be an instance of class Console")
+
+        self._id = agent_id
+        self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + 
str(agent_id)
+        self._console = console
         self._sender = None
         self._packages = [] # list of package names known to this agent
         self._classes = {}  # dict [key:class] of classes known to this agent
         self._subscriptions = [] # list of active standing subscriptions for 
this agent
-        self._exists = False  # true when Agent Announce is received from this 
agent
-        logging.debug( "Created AgentProxy with address: [%s]" % self._address 
)
+        self._announce_timestamp = long(0) # timestamp when last announce 
received
+        logging.debug( "Created Agent with address: [%s]" % self._address )
 
 
-    def key(self):
-        return str(self._name)
+    def getAgentId(self):
+        return self._id
 
+    def isActive(self):
+        return self._announce_timestamp != 0
     
     def _send_msg(self, msg):
         """
@@ -250,26 +267,11 @@
         msg.reply_to = self._console.address()
         handle = self._console._req_correlation.allocate()
         if handle == 0:
-            raise Exception("Can not allocate a sequence id!")
+            raise Exception("Can not allocate a correlation id!")
         msg.correlation_id = str(handle)
         self._sender.send(msg)
         return handle
 
-
-
-    def _fetch_reply_msg(self, handle, timeout=None):
-        """
-        Low-level routine to wait for an expected reply from the agent.
-        """
-        if handle == 0:
-            raise Exception("Invalid handle")
-        msg = self._console._req_correlation.get_data( handle, timeout )
-        if not msg:
-            logging.debug("timed out waiting for reply message")
-        self._console._req_correlation.release( handle )
-        return msg
-
-
     def get_packages(self):
         """
         Return a list of the names of all packages known to this agent.
@@ -371,22 +373,6 @@
 
 
 
-class Notifier(object):
-    """
-    Virtual base class that defines a call back which alerts the application 
that
-    a QMF Console notification is pending.
-    """
-    def console_indication(self):
-        """
-        Called when one or more console items are ready for the console 
application to process.
-        This method may be called by the internal console management thread.  
Its purpose is to
-        indicate that the console application should process pending items.
-        """
-        pass
-
-
-
-
 class Console(Thread):
     """
     A Console manages communications to a collection of agents on behalf of an 
application.
@@ -408,16 +394,17 @@
         self._notifier = notifier
         self._conn = None
         self._session = None
-        # dict of "agent-direct-address":AgentProxy entries
+        self._lock = Lock()
+        # dict of "agent-direct-address":class Agent entries
         self._agent_map = {}
-        self._agent_map_lock = Lock()
         self._direct_recvr = None
         self._announce_recvr = None
         self._locate_sender = None
         self._schema_cache = {}
         self._req_correlation = SequencedWaiter()
         self._operational = False
-        self._agent_discovery = False
+        self._agent_discovery_predicate = None
+        self._default_timeout = 60
         # lock out run() thread
         self._cv = Condition()
         # for passing WorkItems to the application
@@ -514,78 +501,96 @@
         return self._address
 
 
-    def create_agent( self, agent_name ):
+    def _createAgent( self, agent_id ):
         """
         Factory to create/retrieve an agent for this console
         """
-        if not isinstance(agent_name, AgentId):
-            raise TypeError("agent_name must be an instance of AgentId")
+        if not isinstance(agent_id, AgentId):
+            raise TypeError("parameter must be an instance of class AgentId")
 
-        agent = AgentProxy(agent_name)
-
-        self._agent_map_lock.acquire()
+        self._lock.acquire()
         try:
-            if agent_name in self._agent_map:
-                return self._agent_map[agent_name]
+            if agent_id in self._agent_map:
+                return self._agent_map[agent_id]
 
-            agent._console = self
+            agent = Agent(agent_id, self)
             agent._sender = self._session.sender(agent._address)
-            self._agent_map[agent_name] = agent
+            self._agent_map[agent_id] = agent
         finally:
-            self._agent_map_lock.release()
+            self._lock.release()
 
         return agent
 
 
 
-    def destroy_agent( self, agent ):
+    def destroyAgent( self, agent ):
         """
         Undoes create.
         """
-        if not isinstance(agent, AgentProxy):
-            raise TypeError("agent must be an instance of AgentProxy")
+        if not isinstance(agent, Agent):
+            raise TypeError("agent must be an instance of class Agent")
 
-        self._agent_map_lock.acquire()
+        self._lock.acquire()
         try:
-            if agent._name in self._agent_map:
-                del self._agent_map[agent._name]
+            if agent._id in self._agent_map:
+                del self._agent_map[agent._id]
         finally:
-            self._agent_map_lock.release()
+            self._lock.release()
 
 
 
 
-    def find_agent(self, agent_name, timeout=None ):
+    def findAgent(self, agent_id, timeout=None ):
         """
-        Given the name of a particular agent, return an AgentProxy 
representing 
-        that agent.  Return None if the agent does not exist.
+        Given the id of a particular agent, return an instance of class Agent 
+        representing that agent.  Return None if the agent does not exist.
         """
-        self._agent_map_lock.acquire()
+        if not isinstance(agent_id, AgentId):
+            raise TypeError("parameter must be an instance of class AgentId")
+
+        self._lock.acquire()
         try:
-            if agent_name in self._agent_map:
-                return self._agent_map[agent_name]
+            if agent_id in self._agent_map:
+                return self._agent_map[agent_id]
         finally:
-            self._agent_map_lock.release()
+            self._lock.release()
 
-        new_agent = self.create_agent(agent_name)
-        msg = Message(subject=makeSubject(OpCode.agent_locate),
-                      properties={"method":"request"},
-                      content={"query": {"vendor" : agent_name.vendor(),
-                                         "product" : agent_name.product(),
-                                         "name" : agent_name.name()}})
-        handle = new_agent._send_msg(msg)
-        if handle == 0:
-            raise Exception("Failed to send Agent locate message to agent %s" 
% str(agent_name))
+        # agent not present yet - ping it with an agent_locate
 
-        msg = new_agent._fetch_reply_msg(handle, timeout)
-        if not msg:
-            logging.debug("Unable to contact agent '%s' - no reply." % 
agent_name)
-            self.destroy_agent(new_agent)
+        handle = self._req_correlation.allocate()
+        if handle == 0:
+            raise Exception("Can not allocate a correlation id!")
+        try:
+            tmp_sender = self._session.sender(AMQP_QMF_DIRECT + 
AMQP_QMF_NAME_SEPARATOR + str(agent_id))
+            msg = Message(subject=makeSubject(OpCode.agent_locate),
+                          properties={"method":"request"},
+                          content={"query": {Query._TARGET: 
{Query._TARGET_AGENT_ID:None},
+                                             Query._PREDICATE:
+                                                 [Query._LOGIC_AND,
+                                                  [Query._CMP_EQ, "vendor",  
agent_id.vendor()],
+                                                  [Query._CMP_EQ, "product", 
agent_id.product()],
+                                                  [Query._CMP_EQ, "name", 
agent_id.name()]]}})
+            msg.reply_to = self._address
+            msg.correlation_id = str(handle)
+            tmp_sender.send( msg )
+        except SendError, e:
+            logging.error(str(e))
+            self._req_correlation.release(handle)
             return None
-        # @todo - for now, dump the message
-        logging.info( "agent-locate reply received for %s" % agent_name)
-        return new_agent
 
+        if not timeout:
+            timeout = self._default_timeout
+
+        new_agent = None
+        self._req_correlation.get_data( handle, timeout )
+        self._req_correlation.release(handle)
+        self._lock.acquire()
+        try:
+            if agent_id in self._agent_map:
+                new_agent = self._agent_map[agent_id]
+        finally:
+            self._lock.release()
+        return new_agent
 
 
     def run(self):
@@ -600,14 +605,14 @@
             try:
                 msg = self._announce_recvr.fetch(timeout = 0)
                 if msg:
-                    self._rcv_announce(msg)
+                    self._dispatch(msg, _direct=False)
             except:
                 pass
 
             try:
                 msg = self._direct_recvr.fetch(timeout = 0)
                 if msg:
-                    self._rcv_direct(msg)
+                    self._dispatch(msg, _direct=True)
             except:
                 pass
 
@@ -626,7 +631,7 @@
 
                 _callback_thread = currentThread()
                 logging.info("Calling console indication")
-                self._notifier.console_indication()
+                self._notifier.indication()
                 _callback_thread = None
 
             while self._operational and \
@@ -640,79 +645,120 @@
 
     # called by run() thread ONLY
     #
-    def _rcv_announce(self, msg):
+    def _dispatch(self, msg, _direct=True):
         """
-        PRIVATE: Process a message received on the announce receiver
+        PRIVATE: Process a message received from an Agent
         """
-        logging.info( "Announce message received!" )
+        logging.error( "Message received from Agent! [%s]" % msg )
         try:
             version,opcode = parseSubject(msg.subject)
+            # @todo: deal with version mismatch!!!
         except:
-            logging.debug("Ignoring unrecognized broadcast message '%s'" % 
msg.subject)
+            logging.error("Ignoring unrecognized broadcast message '%s'" % 
msg.subject)
             return
 
-        amap = {}; props = {}
+        cmap = {}; props = {}
         if msg.content_type == "amqp/map":
-            amap = msg.content
+            cmap = msg.content
         if msg.properties:
             props = msg.properties
 
         if opcode == OpCode.agent_ind:
-            # agent indication
-            if "name" in amap:
-                if self._agent_discovery:
-                    ind = amap["name"]
-                    if "vendor" in ind and "product" in ind and "name" in ind:
-
-                        agent = self.create_agent(AgentId( ind["vendor"],
-                                                           ind["product"],
-                                                           ind["name"] ))
-                        if not agent._exists:
-                            # new agent
-                            agent._exists = True
-                            logging.info("AGENT_ADDED for %s" % agent)
-                            wi = WorkItem(WorkItem.AGENT_ADDED,
-                                          {"agent": agent})
-                            self._work_q.put(wi)
+            self._handleAgentIndMsg( msg, cmap, version, _direct )
+        elif opcode == OpCode.data_ind:
+            logging.warning("!!! data_ind TBD !!!")
+        elif opcode == OpCode.event_ind:
+            logging.warning("!!! event_ind TBD !!!")
+        elif opcode == OpCode.managed_object:
+            logging.warning("!!! managed_object TBD !!!")
+        elif opcode == OpCode.object_ind:
+            logging.warning("!!! object_ind TBD !!!")
+        elif opcode == OpCode.response:
+            logging.warning("!!! response TBD !!!")
+        elif opcode == OpCode.schema_ind:
+            logging.warning("!!! schema_ind TBD !!!")
+        elif opcode == OpCode.noop:
+             logging.debug("No-op msg received.")
         else:
             logging.warning("Ignoring message with unrecognized 'opcode' 
value: '%s'" % opcode)
 
 
-
-    # called by run() thread ONLY
-    #
-    def _rcv_direct(self, msg):
+    def _handleAgentIndMsg(self, msg, cmap, version, direct):
         """
-        PRIVATE: Process a message sent to my direct receiver
+        Process a received agent-ind message.  This message may be a response 
to a
+        agent-locate, or it can be an unsolicited agent announce.
         """
-        logging.info( "direct message received!" )
+        logging.debug("_handleAgentIndMsg '%s'" % msg)
+
+        if Query._TARGET_AGENT_ID in cmap:
+            try:
+                agent_id = AgentIdFactory(cmap[Query._TARGET_AGENT_ID])
+            except:
+                logging.debug("Bad agent-ind message received: '%s'" % msg)
+                return
+
+        ignore = True
+        matched = False
+        correlated = False
         if msg.correlation_id:
-            self._req_correlation.put_data(msg.correlation_id, msg)
+            correlated = self._req_correlation.isValid(msg.correlation_id)
 
+        if direct and correlated:
+            ignore = False
+        elif self._agent_discovery_predicate:
+            matched = _doQuery( self._agent_discovery_predicate,
+                                agent_id.mapEncode() )
+            ignore = not matched
+
+        if not ignore:
+            agent = None
+            self._lock.acquire()
+            try:
+                if agent_id in self._agent_map:
+                    agent = self._agent_map[agent_id]
+            finally:
+                self._lock.release()
+
+            if not agent:
+                # need to create and add a new agent
+                agent = self._createAgent(agent_id)
+
+            old_timestamp = agent._announce_timestamp
+            agent._announce_timestamp = time.time()
+
+            if old_timestamp == 0 and matched:
+                logging.debug("AGENT_ADDED for %s" % agent)
+                wi = WorkItem(WorkItem.AGENT_ADDED,
+                              {"agent": agent})
+                self._work_q.put(wi)
+
+            if correlated:
+                # wake up all waiters
+                logging.debug("waking waiters for correlation id %s" % 
msg.correlation_id)
+                self._req_correlation.put_data(msg.correlation_id, msg)
 
 
-    def enable_agent_discovery(self):
+    def enableAgentDiscovery(self, query=None):
         """
         Called to enable the asynchronous Agent Discovery process.
         Once enabled, AGENT_ADD work items can arrive on the WorkQueue.
         """
-        if not self._agent_discovery:
-            self._agent_discovery = True
-            msg = Message(subject=makeSubject(OpCode.agent_locate),
-                          properties={"method":"request"},
-                          content={"query": {"vendor": "*",
-                                             "product": "*",
-                                             "name": "*"}})
-            self._locate_sender.send(msg)
-
+        self._agent_discovery_predicate = [Query._CMP_TRUE]  # default: match 
all indications
+        if query:
+            if not isinstance(query, dict):
+                raise TypeError("parameter must be of type dict")
+            if Query._TARGET not in query or query[Query._TARGET] != 
{Query._TARGET_AGENT_ID:None}:
+                raise TypeError("query must be for an agent '%s'" % query)
+            if Query._PREDICATE in query:
+                self._agent_discovery_predicate = query[Query._PREDICATE][:]
 
 
-    def disable_agent_discovery(self):
+    def disableAgentDiscovery(self):
         """
         Called to disable the async Agent Discovery process enabled by
-        calling enable_agent_discovery()
+        calling enableAgentDiscovery()
         """
-        self._agent_discovery = False
+        self._agent_discovery_predicate = None
 
 
 
@@ -731,7 +777,7 @@
         """
         try:
             wi = self._work_q.get(True, timeout)
-        except Queue.Empty:
+        except:
             return None
         return wi
 
@@ -1125,8 +1171,7 @@
 
 if __name__ == '__main__':
     # temp test code
-    import time
-    from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, 
SchemaPropertyFactory,
+    from qmfCommon import (SchemaEventClassFactory, qmfTypes, 
SchemaPropertyFactory,
                            SchemaObjectClassFactory, ObjectIdFactory, QmfData, 
QmfDescribed,
                            QmfDescribedFactory, QmfManaged, QmfManagedFactory, 
QmfDataFactory,
                            QmfEvent)
@@ -1142,7 +1187,7 @@
     _myConsole.add_connection( _c )
 
     logging.info( "Finding Agent" )
-    _myAgent = _myConsole.find_agent( AgentId( "redhat.com", "agent", "tross" 
), 5 )
+    _myAgent = _myConsole.findAgent( AgentId( "redhat.com", "agent", "tross" 
), 5 )
 
     logging.info( "Agent Found: %s" % _myAgent )
 
@@ -1159,7 +1204,7 @@
             self._myContext = context
             self.WorkAvailable = False
 
-        def console_indication(self):
+        def indication(self):
             print("Indication received! context=%d" % self._myContext)
             self.WorkAvailable = True
 
@@ -1168,7 +1213,7 @@
     _myConsole = Console(notifier=_noteMe)
     _myConsole.add_connection( _c )
 
-    _myConsole.enable_agent_discovery()
+    _myConsole.enableAgentDiscovery()
     logging.info("Waiting...")
 
 

Modified: qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py?rev=891456&r1=891455&r2=891456&view=diff
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py (original)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py Wed Dec 16 22:24:57 
2009
@@ -1,58 +1,100 @@
 import logging
 import time
+from threading import Semaphore
+
 
 from qpid.messaging import *
 from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, 
SchemaProperty,
                        SchemaObjectClass, ObjectIdFactory, QmfData, 
QmfDescribed,
                        QmfDescribedFactory, QmfManaged, QmfManagedFactory, 
QmfDataFactory,
-                       QmfEvent, SchemaMethod)
+                       QmfEvent, SchemaMethod, Notifier)
 from qmfAgent import (Agent, QmfAgentData)
 
 
-class MyAgent(object):
-    def main(self):
 
-        self._agent = Agent( "redhat.com", "qmf", "testAgent" )
+class ExampleNotifier(Notifier):
+    def __init__(self):
+        self._sema4 = Semaphore(0)   # locked
+
+    def indication(self):
+        self._sema4.release()
+
+    def waitForWork(self):
+        logging.error("Waiting for event...")
+        self._sema4.acquire()
+        logging.error("...event present")
+
+
+
+#
+# An example agent application
+#
+
+_notifier = ExampleNotifier()
+_agent = Agent( "redhat.com", "qmf", "testAgent", _notifier )
         
-        # Dynamically construct a class schema
+# Dynamically construct a class schema
+
+_schema = SchemaObjectClass( "MyPackage", "MyClass",
+                             desc="A test data schema",
+                             _pkey=["index1", "index2"] )
+# add properties
+_schema.addProperty( "index1",
+                     SchemaProperty(qmfTypes.TYPE_UINT8))
+_schema.addProperty( "index2",
+                     SchemaProperty(qmfTypes.TYPE_LSTR))
+
+# add method
+_meth = SchemaMethod( _desc="A test method" )
+_meth.addArgument( "arg1", SchemaProperty(qmfTypes.TYPE_UINT32) )
+_meth.addArgument( "arg2", SchemaProperty(qmfTypes.TYPE_LSTR) )
+_meth.addArgument( "arg3", SchemaProperty(qmfTypes.TYPE_BOOL) )
+
+_schema.addMethod( "meth_1", _meth )
+
+# Add schema to Agent
 
-        _schema = SchemaObjectClass( "MyPackage", "MyClass",
-                                     desc="A test data schema",
-                                     _pkey=["index1", "index2"] )
-        # add properties
-        _schema.addProperty( "index1",
-                             SchemaProperty(qmfTypes.TYPE_UINT8))
-        _schema.addProperty( "index2",
-                             SchemaProperty(qmfTypes.TYPE_LSTR))
+_agent.registerObjectClass(_schema)
 
-        # add method
-        _meth = SchemaMethod( _desc="A test method" )
-        _meth.addArgument( "arg1", SchemaProperty(qmfTypes.TYPE_UINT32) )
-        _meth.addArgument( "arg2", SchemaProperty(qmfTypes.TYPE_LSTR) )
-        _meth.addArgument( "arg3", SchemaProperty(qmfTypes.TYPE_BOOL) )
+# instantiate managed data objects matching the schema
 
-        _schema.addMethod( "meth_3", _meth )
+_obj = QmfAgentData( _agent, _schema )
+_obj.setProperty("index1", 100)
+_obj.setProperty("index2", "a name" )
 
-        # Add schema to Agent
+_agent.addObject( _obj )
+_agent.addObject( QmfAgentData( _agent, _schema,
+                                _props={"index1":99, 
+                                        "index2": "another name"} ))
 
-        self._agent.registerObjectClass(_schema)
+## Now connect to the broker
 
-        # instantiate managed data objects matching the schema
+_c = Connection("localhost")
+_c.connect()
+_agent.setConnection(_c)
 
-        obj = QmfAgentData( self._agent, _schema )
-        obj.setProperty("index1", 100)
-        obj.setProperty("index2", "a name" )
 
+_done = False
+while not _done:
+    try:
+        _notifier.waitForWork()
 
-        self._agent.addObject( QmfAgentData( self._agent, _schema,
-                                             _props={"index1":99, 
-                                                     "index2": "another name"} 
))
+        _wi = _agent.getNextWorkItem(timeout=0)
+        while _wi:
+            print("work item %d:%s" % (_wi.getType(), str(_wi.getParams())))
+            _agent.releaseWorkItem(_wi)
+            _wi = _agent.getNextWorkitem(timeout=0)
+    except:
+        logging.info( "shutting down..." )
+        _done = True
 
+logging.info( "Removing connection... TBD!!!" )
+#_myConsole.remove_connection( _c, 10 )
 
-        return None
+logging.info( "Destroying agent... TBD!!!" )
+#_myConsole.destroy( 10 )
 
+logging.info( "******** agent test done ********" )
 
 
 
-app = MyAgent()
-print( "s='%s'", str(app.main()))

Added: qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py?rev=891456&view=auto
==============================================================================
--- qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py (added)
+++ qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py Wed Dec 16 
22:24:57 2009
@@ -0,0 +1,67 @@
+import logging
+import time
+from threading import Semaphore
+
+
+from qpid.messaging import *
+from qmfCommon import (Notifier, Query)
+from qmfConsole import Console
+
+
+class ExampleNotifier(Notifier):
+    def __init__(self):
+        self._sema4 = Semaphore(0)   # locked
+
+    def indication(self):
+        self._sema4.release()
+
+    def waitForWork(self):
+        logging.error("Waiting for event...")
+        self._sema4.acquire()
+        logging.error("...event present")
+
+
+logging.getLogger().setLevel(logging.INFO)
+
+logging.info( "Starting Connection" )
+_c = Connection("localhost")
+_c.connect()
+
+logging.info( "Starting Console" )
+
+_notifier = ExampleNotifier()
+_myConsole = Console(notifier=_notifier)
+_myConsole.add_connection( _c )
+
+# Discover only agents from vendor "redhat.com" that 
+# are a "qmf" product....
+# @todo: replace "manual" query construction with 
+# a formal class-based Query API
+_query = {Query._TARGET: {Query._TARGET_AGENT_ID:None},
+          Query._PREDICATE:
+              [Query._LOGIC_AND,
+               [Query._CMP_EQ, "vendor",  "redhat.com"],
+               [Query._CMP_EQ, "product", "qmf"]]}
+
+_myConsole.enableAgentDiscovery(_query)
+
+_done = False
+while not _done:
+    try:
+        _notifier.waitForWork()
+
+        _wi = _myConsole.get_next_workitem(timeout=0)
+        while _wi:
+            print("!!! work item received %d:%s" % (_wi.getType(), 
str(_wi.getParams())))
+            _wi = _myConsole.get_next_workitem(timeout=0)
+    except:
+        logging.info( "shutting down..." )
+        _done = True
+
+logging.info( "Removing connection" )
+_myConsole.remove_connection( _c, 10 )
+
+logging.info( "Destroying console:" )
+_myConsole.destroy( 10 )
+
+logging.info( "******** console test done ********" )



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to