Modified: qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py?rev=898057&r1=898056&r2=898057&view=diff ============================================================================== --- qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py (original) +++ qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py Mon Jan 11 20:46:07 2010 @@ -30,10 +30,9 @@ from qpid.messaging import * -from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, AMQP_QMF_AGENT_INDICATION, - AMQP_QMF_AGENT_LOCATE, AgentId, makeSubject, parseSubject, OpCode, - QmfQuery, AgentIdFactory, Notifier, QmfQueryPredicate, MsgKey, - QmfData) +from qmfCommon import (makeSubject, parseSubject, OpCode, QmfQuery, Notifier, + QmfQueryPredicate, MsgKey, QmfData, QmfAddress, + AMQP_QMF_AGENT_LOCATE, AMQP_QMF_AGENT_INDICATION) @@ -186,69 +185,106 @@ return False +##============================================================================== +## DATA MODEL +##============================================================================== + -#class ObjectProxy(QmfObject): -class ObjectProxy(object): +class QmfConsoleData(QmfData): """ - A local representation of a QmfObject that is managed by a remote agent. + Console's representation of an managed QmfData instance. """ - def __init__(self, agent, cls, kwargs={}): + def __init__(self, map_, agent, _schema=None): + super(QmfConsoleData, self).__init__(_map=map_, + _schema=_schema, + _const=True) + self._agent = agent + + def get_timestamps(self): """ - @type agent: qmfConsole.AgentProxy - @param agent: Agent that manages this object. - @type cls: qmfCommon.SchemaObjectClass - @param cls: Schema that describes the class. - @type kwargs: dict - @param kwargs: ??? supported keys ??? + Returns a list of timestamps describing the lifecycle of + the object. All timestamps are represented by the AMQP + timestamp type. [0] = time of last update from Agent, + [1] = creation timestamp + [2] = deletion timestamp, or zero if not + deleted. """ - # QmfObject.__init__(self, cls, kwargs) - self._agent = agent + return [self._utime, self._ctime, self._dtime] + + def get_create_time(self): + """ + returns the creation timestamp + """ + return self._ctime - # def update(self): - def refresh(self, timeout = None): + def get_update_time(self): + """ + returns the update timestamp """ - Called to re-fetch the current state of the object from the agent. This updates - the contents of the object to their most current values. + return self._utime - @rtype: bool - @return: True if refresh succeeded. Refresh may fail if agent does not respond. + def get_delete_time(self): + """ + returns the deletion timestamp, or zero if not yet deleted. """ - if not self._agent: - raise Exception("No Agent associated with this object") - newer = self._agent.get_object(QmfQuery({"object_id":None}), timeout) - if newer == None: - logging.error("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) - raise Exception("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) - #self.mergeUpdate(newer) ### ??? in Rafi's console.py::Object Class + return self._dtime - ### def _merge_update(self, newerObject): - ### ??? in Rafi's console.py::Object Class + def is_deleted(self): + """ + True if deletion timestamp not zero. + """ + return self._dtime != long(0) + def refresh(self, _reply_handle=None, _timeout=None): + """ + request that the Agent update the value of this object's + contents. + """ + logging.error(" TBD!!!") + return None - ### def is_deleted(self): - ### ??? in Rafi's console.py::Object Class + def invoke_method(self, name, _in_args=None, _reply_handle=None, + _timeout=None): + """ + invoke the named method. + """ + logging.error(" TBD!!!") + return None - def key(self): pass +class QmfLocalData(QmfData): + """ + Console's representation of an unmanaged QmfData instance. There + is no remote agent associated with this instance. The Console has + full control over this instance. + """ + def __init__(self, values, _subtypes={}, _tag=None, _object_id=None, + _schema=None): + # timestamp in millisec since epoch UTC + ctime = long(time.time() * 1000) + super(QmfLocalData, self).__init__(_values=values, + _subtypes=_subtypes, _tag=_tag, + _object_id=_object_id, + _schema=_schema, _ctime=ctime, + _utime=ctime, _const=False) class Agent(object): """ A local representation of a remote agent managed by this console. """ - def __init__(self, agent_id, console): + def __init__(self, name, console): """ @type name: AgentId @param name: uniquely identifies this agent in the AMQP domain. """ - if not isinstance(agent_id, AgentId): - raise TypeError("parameter must be an instance of class AgentId") + if not isinstance(console, Console): raise TypeError("parameter must be an instance of class Console") - self._id = agent_id - self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id) + self._name = name + self._address = QmfAddress.direct(name, console._domain) self._console = console self._sender = None self._packages = {} # map of {package-name:[list of class-names], } for this agent @@ -257,8 +293,8 @@ logging.debug( "Created Agent with address: [%s]" % self._address ) - def getAgentId(self): - return self._id + def get_name(self): + return self._name def isActive(self): return self._announce_timestamp != None @@ -267,7 +303,7 @@ """ Low-level routine to asynchronously send a message to this agent. """ - msg.reply_to = self._console.address() + msg.reply_to = str(self._console._address) # handle = self._console._req_correlation.allocate() # if handle == 0: # raise Exception("Can not allocate a correlation id!") @@ -329,7 +365,7 @@ pass def __repr__(self): - return self._address + return str(self._address) def __str__(self): return self.__repr__() @@ -339,7 +375,7 @@ """ msg = Message(subject=makeSubject(OpCode.get_query), properties={"method":"request"}, - content={MsgKey.query: query.mapEncode()}) + content={MsgKey.query: query.map_encode()}) self._sendMsg( msg, correlation_id ) @@ -389,7 +425,7 @@ """ A Console manages communications to a collection of agents on behalf of an application. """ - def __init__(self, name=None, notifier=None, + def __init__(self, name=None, _domain=None, notifier=None, reply_timeout = 60, # agent_timeout = 120, agent_timeout = 60, @@ -403,10 +439,12 @@ @param kwargs: ??? Unused """ Thread.__init__(self) - self._name = name - if not self._name: + if not name: self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) - self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + self._name + else: + self._name = str(name) + self._domain = _domain + self._address = QmfAddress.direct(self._name, self._domain) self._notifier = notifier self._lock = Lock() self._conn = None @@ -467,10 +505,24 @@ raise Exception( "Multiple connections per Console not supported." ); self._conn = conn self._session = conn.session(name=self._name) - self._direct_recvr = self._session.receiver(self._address, capacity=1) - self._announce_recvr = self._session.receiver(AMQP_QMF_AGENT_INDICATION, + self._direct_recvr = self._session.receiver(str(self._address) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}", + capacity=1) + ind_addr = QmfAddress.topic(AMQP_QMF_AGENT_INDICATION, self._domain) + logging.debug("agent.ind addr=%s" % ind_addr) + self._announce_recvr = self._session.receiver(str(ind_addr) + + ";{create:always," + " node-properties:{type:topic}}", capacity=1) - self._locate_sender = self._session.sender(AMQP_QMF_AGENT_LOCATE) + locate_addr = QmfAddress.topic(AMQP_QMF_AGENT_LOCATE, self._domain) + logging.debug("agent.locate addr=%s" % locate_addr) + self._locate_sender = self._session.sender(str(locate_addr) + + ";{create:always," + " node-properties:{type:topic}}") # # Now that receivers are created, fire off the receive thread... # @@ -495,7 +547,7 @@ if self.isAlive(): # kick my thread to wake it up logging.debug("Making temp sender for [%s]" % self._address) - tmp_sender = self._session.sender(self._address) + tmp_sender = self._session.sender(str(self._address)) try: msg = Message(subject=makeSubject(OpCode.noop)) tmp_sender.send( msg, sync=True ) @@ -535,21 +587,17 @@ finally: self._lock.release() - - - - def findAgent(self, agent_id, timeout=None ): + def findAgent(self, name, timeout=None ): """ Given the id of a particular agent, return an instance of class Agent representing that agent. Return None if the agent does not exist. """ - if not isinstance(agent_id, AgentId): - raise TypeError("parameter must be an instance of class AgentId") self._lock.acquire() try: - if agent_id in self._agent_map: - return self._agent_map[agent_id] + agent = self._agent_map.get(name) + if agent: + return agent finally: self._lock.release() @@ -559,17 +607,21 @@ if handle == 0: raise Exception("Can not allocate a correlation id!") try: - tmp_sender = self._session.sender(AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(agent_id)) + tmp_sender = self._session.sender(str(QmfAddress.direct(name, + self._domain)) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") + query = QmfQuery({QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None}, QmfQuery._PREDICATE: - {QmfQuery._LOGIC_AND: - [{QmfQuery._CMP_EQ: ["vendor", agent_id.vendor()]}, - {QmfQuery._CMP_EQ: ["product", agent_id.product()]}, - {QmfQuery._CMP_EQ: ["name", agent_id.name()]}]}}) + {QmfQuery._CMP_EQ: ["_name", name]}}) msg = Message(subject=makeSubject(OpCode.agent_locate), properties={"method":"request"}, - content={MsgKey.query: query.mapEncode()}) - msg.reply_to = self._address + content={MsgKey.query: query.map_encode()}) + msg.reply_to = str(self._address) msg.correlation_id = str(handle) logging.debug("Sending Agent Locate (%s)" % time.time()) tmp_sender.send( msg ) @@ -588,13 +640,43 @@ logging.debug("Agent Locate wait ended (%s)" % time.time()) self._lock.acquire() try: - if agent_id in self._agent_map: - new_agent = self._agent_map[agent_id] + new_agent = self._agent_map.get(name) finally: self._lock.release() return new_agent + def doQuery(self, agent, query, timeout=None ): + """ + """ + + handle = self._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a correlation id!") + try: + logging.debug("Sending Query to Agent (%s)" % time.time()) + agent._sendQuery(query, handle) + except SendError, e: + logging.error(str(e)) + self._req_correlation.release(handle) + return None + + if not timeout: + timeout = self._reply_timeout + + logging.debug("Waiting for response to Query (%s)" % timeout) + reply = self._req_correlation.get_data(handle, timeout) + self._req_correlation.release(handle) + logging.debug("Agent Query wait ended (%s)" % time.time()) + if reply: + print("Agent Query Reply='%s'" % reply) + return reply.content + else: + print("Agent Query FAILED!!!") + return None + + + def run(self): global _callback_thread # @@ -607,7 +689,6 @@ try: msg = self._announce_recvr.fetch(timeout = 0) if msg: - logging.error( "Announce Msg r...@%s: [%s]" % (time.time(), msg) ) self._dispatch(msg, _direct=False) except Empty: pass @@ -615,7 +696,6 @@ try: msg = self._direct_recvr.fetch(timeout = 0) if msg: - logging.error( "Direct Msg r...@%s: [%s]" % (time.time(), msg) ) self._dispatch(msg, _direct=True) except Empty: pass @@ -654,6 +734,9 @@ """ PRIVATE: Process a message received from an Agent """ + + logging.error( "Message received from Agent! [%s]" % msg ) + try: version,opcode = parseSubject(msg.subject) # @todo: deal with version mismatch!!! @@ -670,7 +753,7 @@ if opcode == OpCode.agent_ind: self._handleAgentIndMsg( msg, cmap, version, _direct ) elif opcode == OpCode.data_ind: - logging.warning("!!! data_ind TBD !!!") + self._handleDataIndMsg(msg, cmap, version, _direct) elif opcode == OpCode.event_ind: logging.warning("!!! event_ind TBD !!!") elif opcode == OpCode.managed_object: @@ -696,7 +779,8 @@ if MsgKey.agent_info in cmap: try: - agent_id = AgentIdFactory(cmap[MsgKey.agent_info]) + # TODO: fix + name = cmap[MsgKey.agent_info]["_name"] except: logging.warning("Bad agent-ind message received: '%s'" % msg) return @@ -709,21 +793,22 @@ if direct and correlated: ignore = False elif self._agent_discovery_filter: - matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode())) - ignore = not matched + logging.error("FIXME: agent discovery filter - new agent name style") + # matched = self._agent_discovery_filter.evaluate(QmfData(agent_id.mapEncode())) + # ignore = not matched + matched = True; ignore = False # for now if not ignore: agent = None self._lock.acquire() try: - if agent_id in self._agent_map: - agent = self._agent_map[agent_id] + agent = self._agent_map.get(name) finally: self._lock.release() if not agent: # need to create and add a new agent - agent = self._createAgent(agent_id) + agent = self._createAgent(name) # lock out expiration scanning code self._lock.acquire() @@ -746,6 +831,22 @@ + + def _handleDataIndMsg(self, msg, cmap, version, direct): + """ + Process a received data-ind message. + """ + logging.debug("_handleDataIndMsg '%s' (%s)" % (msg, time.time())) + + if not self._req_correlation.isValid(msg.correlation_id): + logging.error("FIXME: uncorrelated data indicate??? msg='%s'" % str(msg)) + return + + # wake up all waiters + logging.error("waking waiters for correlation id %s" % msg.correlation_id) + self._req_correlation.put_data(msg.correlation_id, msg) + + def _expireAgents(self): """ Check for expired agents and issue notifications when they expire. @@ -777,21 +878,26 @@ - def _createAgent( self, agent_id ): + def _createAgent( self, name ): """ Factory to create/retrieve an agent for this console """ - if not isinstance(agent_id, AgentId): - raise TypeError("parameter must be an instance of class AgentId") self._lock.acquire() try: - if agent_id in self._agent_map: - return self._agent_map[agent_id] + agent = self._agent_map.get(name) + if agent: + return agent + + agent = Agent(name, self) + agent._sender = self._session.sender(str(agent._address) + + ";{create:always," + " node-properties:" + " {type:topic," + " x-properties:" + " {type:direct}}}") - agent = Agent(agent_id, self) - agent._sender = self._session.sender(agent._address) - self._agent_map[agent_id] = agent + self._agent_map[name] = agent finally: self._lock.release() @@ -1235,33 +1341,13 @@ if __name__ == '__main__': # temp test code - from qmfCommon import (SchemaEventClassFactory, qmfTypes, SchemaPropertyFactory, - SchemaObjectClassFactory, ObjectIdFactory, QmfDescribed, - QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory, - QmfEvent) - logging.getLogger().setLevel(logging.INFO) - - logging.info( "Starting Connection" ) - _c = Connection("localhost") - _c.connect() - #c.start() - - logging.info( "Starting Console" ) - _myConsole = Console() - _myConsole.addConnection( _c ) - - logging.info( "Finding Agent" ) - _myAgent = _myConsole.findAgent( AgentId( "redhat.com", "agent", "tross" ), 5 ) + from qmfCommon import (qmfTypes, QmfData, + QmfEvent, SchemaClassId, SchemaEventClass, + SchemaProperty, SchemaObjectClass) - logging.info( "Agent Found: %s" % _myAgent ) - - logging.info( "Removing connection" ) - _myConsole.removeConnection( _c, 10 ) - - logging.info( "Destroying console:" ) - _myConsole.destroy( 10 ) + logging.getLogger().setLevel(logging.INFO) - logging.info( "************* Starting Async Console **************" ) + logging.info( "************* Creating Async Console **************" ) class MyNotifier(Notifier): def __init__(self, context): @@ -1275,239 +1361,179 @@ _noteMe = MyNotifier( 666 ) _myConsole = Console(notifier=_noteMe) - _myConsole.addConnection( _c ) _myConsole.enableAgentDiscovery() logging.info("Waiting...") - while not _noteMe.WorkAvailable: - try: - print("No work yet...sleeping!") - time.sleep(1) - except KeyboardInterrupt: - break - - - print("Work available = %d items!" % _myConsole.getWorkItemCount()) - _wi = _myConsole.getNextWorkItem(timeout=0) - while _wi: - print("work item %d:%s" % (_wi.getType(), str(_wi.getParams()))) - _wi = _myConsole.getNextWorkItem(timeout=0) - - - logging.info( "Removing connection" ) - _myConsole.removeConnection( _c, 10 ) - logging.info( "Destroying console:" ) _myConsole.destroy( 10 ) logging.info( "******** Messing around with Schema ********" ) - _sec = SchemaEventClassFactory( { "schema_id": # SchemaClassId map - {"package_name": "myPackage", - "class_name": "myClass", - "type": "event"}, - "desc": "A typical event schema", - "properties": {"Argument-1": - {"amqp_type": qmfTypes.TYPE_UINT8, - "min": 0, - "max": 100, - "unit": "seconds", - "desc": "sleep value"}, - "Argument-2": - {"amqp_type": qmfTypes.TYPE_LSTR, - "maxlen": 100, - "desc": "a string argument"}}} ) - print("_sec=%s" % _sec.getClassId()) - print("_sec.gePropertyCount()=%d" % _sec.getPropertyCount() ) - print("_sec.getProperty('Argument-1`)=%s" % _sec.getProperty('Argument-1') ) - print("_sec.getProperty('Argument-2`)=%s" % _sec.getProperty('Argument-2') ) + _sec = SchemaEventClass( _classId=SchemaClassId("myPackage", "myClass", + stype=SchemaClassId.TYPE_EVENT), + _desc="A typical event schema", + _props={"Argument-1": SchemaProperty(_type_code=qmfTypes.TYPE_UINT8, + kwargs = {"min":0, + "max":100, + "unit":"seconds", + "desc":"sleep value"}), + "Argument-2": SchemaProperty(_type_code=qmfTypes.TYPE_LSTR, + kwargs={"maxlen":100, + "desc":"a string argument"})}) + print("_sec=%s" % _sec.get_class_id()) + print("_sec.gePropertyCount()=%d" % _sec.get_property_count() ) + print("_sec.getProperty('Argument-1`)=%s" % _sec.get_property('Argument-1') ) + print("_sec.getProperty('Argument-2`)=%s" % _sec.get_property('Argument-2') ) try: - print("_sec.getProperty('not-found')=%s" % _sec.getProperty('not-found') ) + print("_sec.getProperty('not-found')=%s" % _sec.get_property('not-found') ) except: pass - print("_sec.getProperties()='%s'" % _sec.getProperties()) + print("_sec.getProperties()='%s'" % _sec.get_properties()) print("Adding another argument") - _arg3 = SchemaPropertyFactory( { "amqp_type": qmfTypes.TYPE_BOOL, - "dir": "IO", - "desc": "a boolean argument"} ) - _sec.addProperty('Argument-3', _arg3) - print("_sec=%s" % _sec.getClassId()) - print("_sec.getPropertyCount()=%d" % _sec.getPropertyCount() ) - print("_sec.getProperty('Argument-1')=%s" % _sec.getProperty('Argument-1') ) - print("_sec.getProperty('Argument-2')=%s" % _sec.getProperty('Argument-2') ) - print("_sec.getProperty('Argument-3')=%s" % _sec.getProperty('Argument-3') ) + _arg3 = SchemaProperty( _type_code=qmfTypes.TYPE_BOOL, + kwargs={"dir":"IO", + "desc":"a boolean argument"}) + _sec.add_property('Argument-3', _arg3) + print("_sec=%s" % _sec.get_class_id()) + print("_sec.getPropertyCount()=%d" % _sec.get_property_count() ) + print("_sec.getProperty('Argument-1')=%s" % _sec.get_property('Argument-1') ) + print("_sec.getProperty('Argument-2')=%s" % _sec.get_property('Argument-2') ) + print("_sec.getProperty('Argument-3')=%s" % _sec.get_property('Argument-3') ) - print("_arg3.mapEncode()='%s'" % _arg3.mapEncode() ) + print("_arg3.mapEncode()='%s'" % _arg3.map_encode() ) - _secmap = _sec.mapEncode() + _secmap = _sec.map_encode() print("_sec.mapEncode()='%s'" % _secmap ) - _sec2 = SchemaEventClassFactory( _secmap ) + _sec2 = SchemaEventClass( _map=_secmap ) - print("_sec=%s" % _sec.getClassId()) - print("_sec2=%s" % _sec2.getClassId()) + print("_sec=%s" % _sec.get_class_id()) + print("_sec2=%s" % _sec2.get_class_id()) - - - - _soc = SchemaObjectClassFactory( {"schema_id": {"package_name": "myOtherPackage", - "class_name": "myOtherClass", - "type": "data"}, - "desc": "A test data object", - "properties": - {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8, - "access": "RO", - "index": True, - "unit": "degrees"}, - "prop2": {"amqp_type": qmfTypes.TYPE_UINT8, - "access": "RW", - "index": True, - "desc": "The Second Property(tm)", - "unit": "radians"}, - "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, - "unit": "seconds", - "desc": "time until I retire"}}, - "methods": - {"meth1": {"desc": "A test method", - "arguments": - {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, - "desc": "an argument 1", - "dir": "I"}, - "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, - "dir": "IO", - "desc": "some weird boolean"}}}, - "meth2": {"desc": "A test method", - "arguments": - {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, - "desc": "an 'nuther argument", - "dir": "I"}}}}, - "primary_key": ["prop2", "prop1"]}) + _soc = SchemaObjectClass( _map = {"_schema_id": {"_package_name": "myOtherPackage", + "_class_name": "myOtherClass", + "_type": "_data"}, + "_desc": "A test data object", + "_values": + {"prop1": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RO", + "index": True, + "unit": "degrees"}, + "prop2": {"amqp_type": qmfTypes.TYPE_UINT8, + "access": "RW", + "index": True, + "desc": "The Second Property(tm)", + "unit": "radians"}, + "statistics": { "amqp_type": qmfTypes.TYPE_DELTATIME, + "unit": "seconds", + "desc": "time until I retire"}, + "meth1": {"desc": "A test method", + "arguments": + {"arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an argument 1", + "dir": "I"}, + "arg2": {"amqp_type": qmfTypes.TYPE_BOOL, + "dir": "IO", + "desc": "some weird boolean"}}}, + "meth2": {"desc": "A test method", + "arguments": + {"m2arg1": {"amqp_type": qmfTypes.TYPE_UINT32, + "desc": "an 'nuther argument", + "dir": + "I"}}}}, + "_subtypes": + {"prop1":"qmfProperty", + "prop2":"qmfProperty", + "statistics":"qmfProperty", + "meth1":"qmfMethod", + "meth2":"qmfMethod"}, + "_primary_key_names": ["prop2", "prop1"]}) print("_soc='%s'" % _soc) - print("_soc.getPrimaryKeyList='%s'" % _soc.getPrimaryKeyList()) + print("_soc.getPrimaryKeyList='%s'" % _soc.get_id_names()) - print("_soc.getPropertyCount='%d'" % _soc.getPropertyCount()) - print("_soc.getProperties='%s'" % _soc.getProperties()) - print("_soc.getProperty('prop2')='%s'" % _soc.getProperty('prop2')) + print("_soc.getPropertyCount='%d'" % _soc.get_property_count()) + print("_soc.getProperties='%s'" % _soc.get_properties()) + print("_soc.getProperty('prop2')='%s'" % _soc.get_property('prop2')) - print("_soc.getMethodCount='%d'" % _soc.getMethodCount()) - print("_soc.getMethods='%s'" % _soc.getMethods()) - print("_soc.getMethod('meth2')='%s'" % _soc.getMethod('meth2')) + print("_soc.getMethodCount='%d'" % _soc.get_method_count()) + print("_soc.getMethods='%s'" % _soc.get_methods()) + print("_soc.getMethod('meth2')='%s'" % _soc.get_method('meth2')) - _socmap = _soc.mapEncode() + _socmap = _soc.map_encode() print("_socmap='%s'" % _socmap) - _soc2 = SchemaObjectClassFactory( _socmap ) + _soc2 = SchemaObjectClass( _map=_socmap ) print("_soc='%s'" % _soc) print("_soc2='%s'" % _soc2) - if _soc2.getClassId() == _soc.getClassId(): + if _soc2.get_class_id() == _soc.get_class_id(): print("soc and soc2 are the same schema") logging.info( "******** Messing around with ObjectIds ********" ) - oid = ObjectIdFactory( {"agent_id": {"vendor": "redhat.com", - "product": "mgmt-tool", - "name": "myAgent1"}, - "primary_key": "key1:key2" }) - - print("oid = %s" % oid) - - oid2 = ObjectIdFactory( oid.mapEncode() ) - - print("oid2 = %s" % oid2) - if oid == oid2: - print("oid1 == oid2") - else: - print("oid1 != oid2") - - hashme = {oid: "myoid"} - print("oid hash = %s" % hashme[oid2] ) - - - qd = QmfData( {"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} ) + qd = QmfData( _values={"prop1":1, "prop2":True, "prop3": {"a":"map"}, "prop4": "astring"} ) print("qd='%s':" % qd) print("prop1=%d prop2=%s prop3=%s prop4=%s" % (qd.prop1, qd.prop2, qd.prop3, qd.prop4)) - print("qd map='%s'" % qd.mapEncode()) - print("qd getProperty('prop4')='%s'" % qd.getProperty("prop4")) - qd.setProperty("prop4", 4) - print("qd setProperty('prop4', 4)='%s'" % qd.getProperty("prop4")) + print("qd map='%s'" % qd.map_encode()) + print("qd getProperty('prop4')='%s'" % qd.get_value("prop4")) + qd.set_value("prop4", 4, "A test property called 4") + print("qd setProperty('prop4', 4)='%s'" % qd.get_value("prop4")) qd.prop4 = 9 print("qd.prop4 = 9 ='%s'" % qd.prop4) qd["prop4"] = 11 print("qd[prop4] = 11 ='%s'" % qd["prop4"]) - print("qd.mapEncode()='%s'" % qd.mapEncode()) - _qd2 = QmfDataFactory( qd.mapEncode() ) - print("_qd2.mapEncode()='%s'" % _qd2.mapEncode()) + print("qd.mapEncode()='%s'" % qd.map_encode()) + _qd2 = QmfData( _map = qd.map_encode() ) + print("_qd2.mapEncode()='%s'" % _qd2.map_encode()) - _qmfDesc1 = QmfDescribed( _schemaId = _soc.getClassId(), - _props = {"prop1": 1, "statistics": 666, "prop2": 0}) + _qmfDesc1 = QmfConsoleData( {"_values" : {"prop1": 1, "statistics": 666, + "prop2": 0}}, + agent="some agent name?", + _schema = _soc) - print("_qmfDesc1 map='%s'" % _qmfDesc1.mapEncode()) + print("_qmfDesc1 map='%s'" % _qmfDesc1.map_encode()) - _qmfDesc1.setSchema( _soc ) + _qmfDesc1._set_schema( _soc ) - print("_qmfDesc1 props{} = '%s'" % _qmfDesc1.getProperties()) - print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.getPrimaryKey()) - print("_qmfDesc1 classid = '%s'" % _qmfDesc1.getSchemaClassId()) + print("_qmfDesc1 prop2 = '%s'" % _qmfDesc1.get_value("prop2")) + print("_qmfDesc1 primarykey = '%s'" % _qmfDesc1.get_object_id()) + print("_qmfDesc1 classid = '%s'" % _qmfDesc1.get_schema_class_id()) - _qmfDescMap = _qmfDesc1.mapEncode() + _qmfDescMap = _qmfDesc1.map_encode() print("_qmfDescMap='%s'" % _qmfDescMap) - _qmfDesc2 = QmfDescribedFactory( _qmfDescMap, _schema=_soc ) - - print("_qmfDesc2 map='%s'" % _qmfDesc2.mapEncode()) - print("_qmfDesc2 props = '%s'" % _qmfDesc2.getProperties()) - print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.getPrimaryKey()) - - - _qmfMgd1 = QmfManaged( _agentId=AgentId("redhat.com", "anAgent", "tross"), - _schema = _soc, - _schemaId = _soc.getClassId(), - _props = {"prop1": 11, "prop2": 10, "statistics":999}) - - - print("_qmfMgd1 map='%s'" % _qmfMgd1.mapEncode()) - - print("_qmfMgd1.getObjectId()='%s'" % _qmfMgd1.getObjectId()) - print("_qmfMgd1 props = '%s'" % _qmfMgd1.getProperties()) - - _qmfMgd1Map = _qmfMgd1.mapEncode() - print("_qmfMgd1Map='%s'" % _qmfMgd1Map) - - _qmfMgd2 = QmfManagedFactory( param=_qmfMgd1.mapEncode(), _schema=_soc ) + _qmfDesc2 = QmfData( _map=_qmfDescMap, _schema=_soc ) - print("_qmfMgd2 map='%s'" % _qmfMgd2.mapEncode()) - print("_qmfMgd2 getObjectId() = '%s'" % _qmfMgd2.getObjectId()) - print("_qmfMgd2 props = '%s'" % _qmfMgd2.getProperties()) + print("_qmfDesc2 map='%s'" % _qmfDesc2.map_encode()) + print("_qmfDesc2 prop2 = '%s'" % _qmfDesc2.get_value("prop2")) + print("_qmfDesc2 primary key = '%s'" % _qmfDesc2.get_object_id()) logging.info( "******** Messing around with QmfEvents ********" ) _qmfevent1 = QmfEvent( _timestamp = 1111, - _agentId = AgentId("redhat.com", "whizzbang2000", "ted"), - _schema = _sec, - _props = {"Argument-1": 77, - "Argument-3": True, - "Argument-2": "a string"}) - print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.mapEncode()) - print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.getTimestamp()) - print("_qmfevent1.getAgentId()='%s'" % _qmfevent1.getAgentId()) + _schema = _sec, + _values = {"Argument-1": 77, + "Argument-3": True, + "Argument-2": "a string"}) + print("_qmfevent1.mapEncode()='%s'" % _qmfevent1.map_encode()) + print("_qmfevent1.getTimestamp()='%s'" % _qmfevent1.get_timestamp()) - _qmfevent1Map = _qmfevent1.mapEncode() + _qmfevent1Map = _qmfevent1.map_encode() - _qmfevent2 = QmfEvent(_map=_qmfevent1Map) - print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.mapEncode()) + _qmfevent2 = QmfEvent(_map=_qmfevent1Map, _schema=_sec) + print("_qmfevent2.mapEncode()='%s'" % _qmfevent2.map_encode()) logging.info( "******** Messing around with Queries ********" )
Modified: qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py?rev=898057&r1=898056&r2=898057&view=diff ============================================================================== --- qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py (original) +++ qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py Mon Jan 11 20:46:07 2010 @@ -4,10 +4,8 @@ from qpid.messaging import * -from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaProperty, - SchemaObjectClass, ObjectIdFactory, QmfData, QmfDescribed, - QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory, - QmfEvent, SchemaMethod, Notifier) +from qmfCommon import (qmfTypes, SchemaProperty, SchemaObjectClass, QmfData, + QmfEvent, SchemaMethod, Notifier, SchemaClassId) from qmfAgent import (Agent, QmfAgentData) @@ -20,9 +18,9 @@ self._sema4.release() def waitForWork(self): - logging.error("Waiting for event...") + print("Waiting for event...") self._sema4.acquire() - logging.error("...event present") + print("...event present") @@ -31,58 +29,54 @@ # _notifier = ExampleNotifier() -_agent = Agent( "redhat.com", "qmf", "testAgent", _notifier ) +_agent = Agent( "qmf.testAgent", _notifier=_notifier ) # Dynamically construct a class schema -_schema = SchemaObjectClass( "MyPackage", "MyClass", - desc="A test data schema", - _pkey=["index1", "index2"] ) +_schema = SchemaObjectClass( _classId=SchemaClassId("MyPackage", "MyClass"), + _desc="A test data schema", + _object_id_names=["index1", "index2"] ) # add properties -_schema.addProperty( "index1", - SchemaProperty(qmfTypes.TYPE_UINT8)) -_schema.addProperty( "index2", - SchemaProperty(qmfTypes.TYPE_LSTR)) +_schema.add_property( "index1", SchemaProperty(qmfTypes.TYPE_UINT8)) +_schema.add_property( "index2", SchemaProperty(qmfTypes.TYPE_LSTR)) + # these two properties are statistics -_schema.addProperty( "query_count", - SchemaProperty(qmfTypes.TYPE_UINT32)) -_schema.addProperty( "method_call_count", - SchemaProperty(qmfTypes.TYPE_UINT32)) +_schema.add_property( "query_count", SchemaProperty(qmfTypes.TYPE_UINT32)) +_schema.add_property( "method_call_count", SchemaProperty(qmfTypes.TYPE_UINT32)) + # These two properties can be set via the method call -_schema.addProperty( "set_string", - SchemaProperty(qmfTypes.TYPE_LSTR)) -_schema.addProperty( "set_int", - SchemaProperty(qmfTypes.TYPE_UINT32)) +_schema.add_property( "set_string", SchemaProperty(qmfTypes.TYPE_LSTR)) +_schema.add_property( "set_int", SchemaProperty(qmfTypes.TYPE_UINT32)) # add method _meth = SchemaMethod( _desc="Method to set string and int in object." ) -_meth.addArgument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) -_meth.addArgument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) -_schema.addMethod( "set_meth", _meth ) +_meth.add_argument( "arg_int", SchemaProperty(qmfTypes.TYPE_UINT32) ) +_meth.add_argument( "arg_str", SchemaProperty(qmfTypes.TYPE_LSTR) ) +_schema.add_method( "set_meth", _meth ) # Add schema to Agent -_agent.registerObjectClass(_schema) +_agent.register_object_class(_schema) # instantiate managed data objects matching the schema -_obj = QmfAgentData( _agent, _schema ) -_obj.setProperty("index1", 100) -_obj.setProperty("index2", "a name" ) -_obj.setProperty("set_string", "UNSET") -_obj.setProperty("set_int", 0) -_obj.setProperty("query_count", 0) -_obj.setProperty("method_call_count", 0) -_agent.addObject( _obj ) - -_agent.addObject( QmfAgentData( _agent, _schema, - _props={"index1":99, - "index2": "another name", - "set_string": "UNSET", - "set_int": 0, - "query_count": 0, - "method_call_count": 0} )) +_obj = QmfAgentData( _agent, _schema=_schema ) +_obj.set_value("index1", 100) +_obj.set_value("index2", "a name" ) +_obj.set_value("set_string", "UNSET") +_obj.set_value("set_int", 0) +_obj.set_value("query_count", 0) +_obj.set_value("method_call_count", 0) +_agent.add_object( _obj ) + +_agent.add_object( QmfAgentData( _agent, _schema=_schema, + _values={"index1":99, + "index2": "another name", + "set_string": "UNSET", + "set_int": 0, + "query_count": 0, + "method_call_count": 0} )) ## Now connect to the broker @@ -100,18 +94,18 @@ while _wi: print("work item %d:%s" % (_wi.getType(), str(_wi.getParams()))) _agent.releaseWorkItem(_wi) - _wi = _agent.getNextWorkitem(timeout=0) + _wi = _agent.getNextWorkItem(timeout=0) except: - logging.info( "shutting down..." ) + print( "shutting down..." ) _done = True -logging.info( "Removing connection... TBD!!!" ) +print( "Removing connection... TBD!!!" ) #_myConsole.remove_connection( _c, 10 ) -logging.info( "Destroying agent... TBD!!!" ) +print( "Destroying agent... TBD!!!" ) #_myConsole.destroy( 10 ) -logging.info( "******** agent test done ********" ) +print( "******** agent test done ********" ) Modified: qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py?rev=898057&r1=898056&r2=898057&view=diff ============================================================================== --- qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py (original) +++ qpid/branches/qmfv2/qpid/python/qmf/test/console_test.py Mon Jan 11 20:46:07 2010 @@ -4,7 +4,7 @@ from qpid.messaging import * -from qmfCommon import (Notifier, QmfQuery) +from qmfCommon import (Notifier, QmfQuery, MsgKey, SchemaClassId, SchemaClass) from qmfConsole import Console @@ -16,18 +16,18 @@ self._sema4.release() def waitForWork(self): - logging.error("Waiting for event...") + print("Waiting for event...") self._sema4.acquire() - logging.error("...event present") + print("...event present") logging.getLogger().setLevel(logging.INFO) -logging.info( "Starting Connection" ) +print( "Starting Connection" ) _c = Connection("localhost") _c.connect() -logging.info( "Starting Console" ) +print( "Starting Console" ) _notifier = ExampleNotifier() _myConsole = Console(notifier=_notifier) @@ -40,30 +40,65 @@ _query = {QmfQuery._TARGET: {QmfQuery._TARGET_AGENT:None}, QmfQuery._PREDICATE: - {QmfQuery._LOGIC_AND: - [{QmfQuery._CMP_EQ: ["vendor", "redhat.com"]}, - {QmfQuery._CMP_EQ: ["product", "qmf"]}]}} + {QmfQuery._CMP_EQ: ["_name", "qmf.testAgent"]}} _query = QmfQuery(_query) _myConsole.enableAgentDiscovery(_query) _done = False while not _done: - try: - _notifier.waitForWork() +# try: + _notifier.waitForWork() - _wi = _myConsole.get_next_workitem(timeout=0) - while _wi: - print("!!! work item received %d:%s" % (_wi.getType(), str(_wi.getParams()))) - _wi = _myConsole.get_next_workitem(timeout=0) - except: - logging.info( "shutting down..." ) - _done = True + _wi = _myConsole.getNextWorkItem(timeout=0) + while _wi: + print("!!! work item received %d:%s" % (_wi.getType(), + str(_wi.getParams()))) -logging.info( "Removing connection" ) + + if _wi.getType() == _wi.AGENT_ADDED: + _agent = _wi.getParams().get("agent") + if not _agent: + print("!!!! AGENT IN REPLY IS NULL !!! ") + + _query = QmfQuery( {QmfQuery._TARGET: + {QmfQuery._TARGET_PACKAGES:None}} ) + + _reply = _myConsole.doQuery(_agent, _query) + + package_list = _reply.get(MsgKey.package_info) + for pname in package_list: + print("!!! Querying for schema from package: %s" % pname) + _query = QmfQuery({QmfQuery._TARGET: + {QmfQuery._TARGET_SCHEMA_ID:None}, + QmfQuery._PREDICATE: + {QmfQuery._CMP_EQ: + [SchemaClassId.KEY_PACKAGE, pname]}}) + + _reply = _myConsole.doQuery(_agent, _query) + + schema_id_list = _reply.get(MsgKey.schema_id) + for sid_map in schema_id_list: + _query = QmfQuery({QmfQuery._TARGET: + {QmfQuery._TARGET_SCHEMA:None}, + QmfQuery._PREDICATE: + {QmfQuery._CMP_EQ: + [SchemaClass.KEY_SCHEMA_ID, sid_map]}}) + + _reply = _myConsole.doQuery(_agent, _query) + + + + _myConsole.releaseWorkItem(_wi) + _wi = _myConsole.getNextWorkItem(timeout=0) +# except: +# logging.info( "shutting down..." ) +# _done = True + +print( "Removing connection" ) _myConsole.removeConnection( _c, 10 ) -logging.info( "Destroying console:" ) +print( "Destroying console:" ) _myConsole.destroy( 10 ) -logging.info( "******** console test done ********" ) +print( "******** console test done ********" ) --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
