Added: qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py?rev=889416&view=auto ============================================================================== --- qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py (added) +++ qpid/branches/qmfv2/qpid/python/qmf/qmfConsole.py Thu Dec 10 20:32:21 2009 @@ -0,0 +1,1129 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import sys +import os +import logging +import platform +import time +import Queue +from threading import Thread +from threading import Lock +from threading import currentThread +from threading import Condition + +from qpid.messaging import * + +from qmfCommon import (AMQP_QMF_DIRECT, AMQP_QMF_NAME_SEPARATOR, AMQP_QMF_AGENT_INDICATION, + AMQP_QMF_AGENT_LOCATE) +from qmfCommon import AgentId + + + +# global flag that indicates which thread (if any) is +# running the console callback +_callback_thread=None + + + + +##============================================================================== +## Sequence Manager +##============================================================================== + +class _Mailbox(object): + """ + Virtual base class for all Mailbox-like objects + """ + def __init__(self): + self._msgs = [] + self._cv = Condition() + self._waiting = False + + def deliver(self, obj): + self._cv.acquire() + try: + self._msgs.append(obj) + # if was empty, notify waiters + if len(self._msgs) == 1: + self._cv.notify() + finally: + self._cv.release() + + def fetch(self, timeout=None): + self._cv.acquire() + try: + if len(self._msgs): + return self._msgs.pop() + self._cv.wait(timeout) + if len(self._msgs): + return self._msgs.pop() + return None + finally: + self._cv.release() + + + +class SequencedWaiter(object): + """ + Manage sequence numbers for asynchronous method calls. + Allows the caller to associate a generic piece of data with a unique sequence + number.""" + + def __init__(self): + self.lock = Lock() + self.sequence = 1L + self.pending = {} + + + def allocate(self): + """ + Reserve a sequence number. + + @rtype: long + @return: a unique nonzero sequence number. + """ + self.lock.acquire() + try: + seq = self.sequence + self.sequence = self.sequence + 1 + self.pending[seq] = _Mailbox() + finally: + self.lock.release() + logging.debug( "sequence %d allocated" % seq) + return seq + + + def put_data(self, seq, new_data): + seq = long(seq) + logging.debug( "putting data [%r] to seq %d..." % (new_data, seq) ) + self.lock.acquire() + try: + if seq in self.pending: + self.pending[seq].deliver(new_data) + else: + logging.error( "seq %d not found!" % seq ) + finally: + self.lock.release() + + + + def get_data(self, seq, timeout=None): + """ + Release a sequence number reserved using the reserve method. This must + be called when the sequence is no longer needed. + + @type seq: int + @param seq: a sequence previously allocated by calling reserve(). + @rtype: any + @return: the data originally associated with the reserved sequence number. + """ + seq = long(seq) + logging.debug( "getting data for seq=%d" % seq) + mbox = None + self.lock.acquire() + try: + if seq in self.pending: + mbox = self.pending[seq] + finally: + self.lock.release() + + # Note well: pending list is unlocked, so we can wait. + # we reference mbox locally, so it will not be released + # until we are done. + + if mbox: + d = mbox.fetch(timeout) + logging.debug( "seq %d fetched %r!" % (seq, d) ) + return d + + logging.debug( "seq %d not found!" % seq ) + return None + + + def release(self, seq): + """ + Release the sequence, and its mailbox + """ + seq = long(seq) + logging.debug( "releasing seq %d" % seq ) + self.lock.acquire() + try: + if seq in self.pending: + del self.pending[seq] + finally: + self.lock.release() + + + +#class ObjectProxy(QmfObject): +class ObjectProxy(object): + """ + A local representation of a QmfObject that is managed by a remote agent. + """ + def __init__(self, agent, cls, kwargs={}): + """ + @type agent: qmfConsole.AgentProxy + @param agent: Agent that manages this object. + @type cls: qmfCommon.SchemaObjectClass + @param cls: Schema that describes the class. + @type kwargs: dict + @param kwargs: ??? supported keys ??? + """ + # QmfObject.__init__(self, cls, kwargs) + self._agent = agent + + # def update(self): + def refresh(self, timeout = None): + """ + Called to re-fetch the current state of the object from the agent. This updates + the contents of the object to their most current values. + + @rtype: bool + @return: True if refresh succeeded. Refresh may fail if agent does not respond. + """ + if not self._agent: + raise Exception("No Agent associated with this object") + newer = self._agent.get_object(Query({"object_id":object_id}), timeout) + if newer == None: + logging.error("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) + raise Exception("Failed to retrieve object %s from agent %s" % (str(self), str(self._agent))) + self.mergeUpdate(newer) ### ??? in Rafi's console.py::Object Class + + ### def _merge_update(self, newerObject): + ### ??? in Rafi's console.py::Object Class + + + ### def is_deleted(self): + ### ??? in Rafi's console.py::Object Class + + def key(self): pass + + + + +class AgentProxy(object): + """ + A local representation of a remote agent managed by this console. + """ + def __init__(self, name): + """ + @type name: AgentId + @param name: uniquely identifies this agent in the AMQP domain. + """ + if not name or not isinstance(name, AgentId): + raise Exception( "Attempt to create an Agent without supplying a valid agent name." ); + + self._name = name + self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + str(name) + self._console = None + self._sender = None + self._packages = [] # list of package names known to this agent + self._classes = {} # dict [key:class] of classes known to this agent + self._subscriptions = [] # list of active standing subscriptions for this agent + self._exists = False # true when Agent Announce is received from this agent + logging.debug( "Created AgentProxy with address: [%s]" % self._address ) + + + def key(self): + return str(self._name) + + + def _send_msg(self, msg): + """ + Low-level routine to asynchronously send a message to this agent. + """ + msg.reply_to = self._console.address() + handle = self._console._req_correlation.allocate() + if handle == 0: + raise Exception("Can not allocate a sequence id!") + msg.correlation_id = str(handle) + self._sender.send(msg) + return handle + + + + def _fetch_reply_msg(self, handle, timeout=None): + """ + Low-level routine to wait for an expected reply from the agent. + """ + if handle == 0: + raise Exception("Invalid handle") + msg = self._console._req_correlation.get_data( handle, timeout ) + if not msg: + logging.debug("timed out waiting for reply message") + self._console._req_correlation.release( handle ) + return msg + + + def get_packages(self): + """ + Return a list of the names of all packages known to this agent. + """ + return self._packages[:] + + def get_classes(self): + """ + Return a dictionary [key:class] of classes known to this agent. + """ + return self._classes[:] + + def get_objects(self, query, kwargs={}): + """ + Return a list of objects that satisfy the given query. + + @type query: dict, or qmfCommon.Query + @param query: filter for requested objects + @type kwargs: dict + @param kwargs: ??? used to build match selector and query ??? + @rtype: list + @return: list of matching objects, or None. + """ + pass + + def get_object(self, query, kwargs={}): + """ + Get one object - query is expected to match only one object. + ??? Recommended: explicit timeout param, default None ??? + + @type query: dict, or qmfCommon.Query + @param query: filter for requested objects + @type kwargs: dict + @param kwargs: ??? used to build match selector and query ??? + @rtype: qmfConsole.ObjectProxy + @return: one matching object, or none + """ + pass + + + def create_subscription(self, query): + """ + Factory for creating standing subscriptions based on a given query. + + @type query: qmfCommon.Query object + @param query: determines the list of objects for which this subscription applies + @rtype: qmfConsole.Subscription + @returns: an object representing the standing subscription. + """ + pass + + def __repr__(self): + return self._address + + def __str__(self): + return self.__repr__() + + + + ##============================================================================== + ## CONSOLE + ##============================================================================== + + + +class WorkItem(object): + """ + Describes an event that has arrived at the Console for the + application to process. The Notifier is invoked when one or + more of these WorkItems become available for processing. + """ + # + # Enumeration of the types of WorkItems produced by the Console + # + AGENT_ADDED = 1 + AGENT_DELETED = 2 + NEW_PACKAGE = 3 + NEW_CLASS = 4 + OBJECT_UPDATE = 5 + EVENT_RECEIVED = 7 + AGENT_HEARTBEAT = 8 + + def __init__(self, kind, kwargs={}): + """ + Used by the Console to create a work item. + + @type kind: int + @param kind: work item type + """ + self._kind = kind + self._param_map = kwargs + + + def getType(self): + return self._kind + + def getParams(self): + return self._param_map + + + +class Notifier(object): + """ + Virtual base class that defines a call back which alerts the application that + a QMF Console notification is pending. + """ + def console_indication(self): + """ + Called when one or more console items are ready for the console application to process. + This method may be called by the internal console management thread. Its purpose is to + indicate that the console application should process pending items. + """ + pass + + + + +class Console(Thread): + """ + A Console manages communications to a collection of agents on behalf of an application. + """ + def __init__(self, name=None, notifier=None, kwargs={}): + """ + @type name: str + @param name: identifier for this console. Must be unique. + @type notifier: qmfConsole.Notifier + @param notifier: invoked when events arrive for processing. + @type kwargs: dict + @param kwargs: ??? Unused + """ + Thread.__init__(self) + self._name = name + if not self._name: + self._name = "qmfc-%s.%d" % (platform.node(), os.getpid()) + self._address = AMQP_QMF_DIRECT + AMQP_QMF_NAME_SEPARATOR + self._name + self._notifier = notifier + self._conn = None + self._session = None + # dict of "agent-direct-address":AgentProxy entries + self._agent_map = {} + self._agent_map_lock = Lock() + self._direct_recvr = None + self._announce_recvr = None + self._locate_sender = None + self._schema_cache = {} + self._req_correlation = SequencedWaiter() + self._operational = False + self._agent_discovery = False + # lock out run() thread + self._cv = Condition() + # for passing WorkItems to the application + self._work_q = Queue.Queue() + ## Old stuff below??? + #self._broker_list = [] + #self.impl = qmfengine.Console() + #self._event = qmfengine.ConsoleEvent() + ##self._cv = Condition() + ##self._sync_count = 0 + ##self._sync_result = None + ##self._select = {} + ##self._cb_cond = Condition() + + + + def destroy(self, timeout=None): + """ + Must be called before the Console is deleted. + Frees up all resources and shuts down all background threads. + + @type timeout: float + @param timeout: maximum time in seconds to wait for all background threads to terminate. Default: forever. + """ + logging.debug("Destroying Console...") + if self._conn: + self.remove_connection(self._conn, timeout) + logging.debug("Console Destroyed") + + + + def add_connection(self, conn): + """ + Add a AMQP connection to the console. The console will setup a session over the + connection. The console will then broadcast an Agent Locate Indication over + the session in order to discover present agents. + + @type conn: qpid.messaging.Connection + @param conn: the connection to the AMQP messaging infrastructure. + """ + if self._conn: + raise Exception( "Multiple connections per Console not supported." ); + self._conn = conn + self._session = conn.session(name=self._name) + self._direct_recvr = self._session.receiver(self._address) + self._announce_recvr = self._session.receiver(AMQP_QMF_AGENT_INDICATION) + self._locate_sender = self._session.sender(AMQP_QMF_AGENT_LOCATE) + # + # Now that receivers are created, fire off the receive thread... + # + self._operational = True + self.start() + + + + def remove_connection(self, conn, timeout=None): + """ + Remove an AMQP connection from the console. Un-does the add_connection() operation, + and releases any agents and sessions associated with the connection. + + @type conn: qpid.messaging.Connection + @param conn: connection previously added by add_connection() + """ + if self._conn and conn and conn != self._conn: + logging.error( "Attempt to delete unknown connection: %s" % str(conn)) + + # tell connection thread to shutdown + self._operational = False + if self.isAlive(): + # kick my thread to wake it up + logging.debug("Making temp sender for [%s]" % self._address) + tmp_sender = self._session.sender(self._address) + try: + msg = Message(subject="qmf4", + properties={"method":"request", + "opcode":"console-ping"}, + content={"data":"ignore"}) + tmp_sender.send( msg, sync=True ) + except SendError, e: + logging.error(str(e)) + logging.debug("waiting for console receiver thread to exit") + self.join(timeout) + if self.isAlive(): + logging.error( "Console thread '%s' is hung..." % self.getName() ) + self._direct_recvr.close() + self._announce_recvr.close() + self._locate_sender.close() + self._session = None + self._conn = None + logging.debug("console connection removal complete") + + + def address(self): + """ + The AMQP address this Console is listening to. + """ + return self._address + + + def create_agent( self, agent_name ): + """ + Factory to create/retrieve an agent for this console + """ + if not isinstance(agent_name, AgentId): + raise TypeError("agent_name must be an instance of AgentId") + + agent = AgentProxy(agent_name) + + self._agent_map_lock.acquire() + try: + if agent_name in self._agent_map: + return self._agent_map[agent_name] + + agent._console = self + agent._sender = self._session.sender(agent._address) + self._agent_map[agent_name] = agent + finally: + self._agent_map_lock.release() + + return agent + + + + def destroy_agent( self, agent ): + """ + Undoes create. + """ + if not isinstance(agent, AgentProxy): + raise TypeError("agent must be an instance of AgentProxy") + + self._agent_map_lock.acquire() + try: + if agent._name in self._agent_map: + del self._agent_map[agent._name] + finally: + self._agent_map_lock.release() + + + + + def find_agent(self, agent_name, timeout=None ): + """ + Given the name of a particular agent, return an AgentProxy representing + that agent. Return None if the agent does not exist. + """ + self._agent_map_lock.acquire() + try: + if agent_name in self._agent_map: + return self._agent_map[agent_name] + finally: + self._agent_map_lock.release() + + new_agent = self.create_agent(agent_name) + msg = Message(subject="qmf4", + properties={"method":"request", + "opcode":"agent-locate"}, + content={"query": {"vendor" : agent_name.vendor(), + "product" : agent_name.product(), + "name" : agent_name.name()}}) + handle = new_agent._send_msg(msg) + if handle == 0: + raise Exception("Failed to send Agent locate message to agent %s" % str(agent_name)) + + msg = new_agent._fetch_reply_msg(handle, timeout) + if not msg: + logging.debug("Unable to contact agent '%s' - no reply." % agent_name) + self.destroy_agent(new_agent) + return None + # @todo - for now, dump the message + logging.info( "agent-locate reply received for %s" % agent_name) + return new_agent + + + + def run(self): + global _callback_thread + # + # @todo KAG Rewrite when api supports waiting on multiple receivers + # + while self._operational: + + qLen = self._work_q.qsize() + + try: + msg = self._announce_recvr.fetch(timeout = 0) + if msg: + self._rcv_announce(msg) + except: + pass + + try: + msg = self._direct_recvr.fetch(timeout = 0) + if msg: + self._rcv_direct(msg) + except: + pass + + # try: + # logging.error("waiting for next rcvr...") + # rcvr = self._session.next_receiver() + # except: + # logging.error("exception during next_receiver()") + + # logging.error("rcvr=%s" % str(rcvr)) + + + if qLen == 0 and self._work_q.qsize() and self._notifier: + # work queue went non-empty, kick + # the application... + + _callback_thread = currentThread() + logging.info("Calling console indication") + self._notifier.console_indication() + _callback_thread = None + + while self._operational and \ + self._announce_recvr.pending() == 0 and \ + self._direct_recvr.pending(): + time.sleep(0.5) + + logging.debug("Shutting down Console thread") + + + + # called by run() thread ONLY + # + def _rcv_announce(self, msg): + """ + PRIVATE: Process a message received on the announce receiver + """ + logging.info( "Announce message received!" ) + if msg.subject != "qmf4": + logging.debug("Ignoring non-qmf message '%s'" % msg.subject) + return + + amap = {} + if msg.content_type == "amqp/map": + amap = msg.content + + if (not msg.properties or + not "method" in msg.properties or + not "opcode" in msg.properties): + logging.error("INVALID MESSAGE PROPERTIES: '%s'" % str(msg.properties)) + return + + if msg.properties["method"] == "indication": + # agent indication + if msg.properties["opcode"] == "agent": + if "name" in amap: + if self._agent_discovery: + ind = amap["name"] + if "vendor" in ind and "product" in ind and "name" in ind: + + agent = self.create_agent(AgentId( ind["vendor"], + ind["product"], + ind["name"] )) + if not agent._exists: + # new agent + agent._exists = True + logging.info("AGENT_ADDED for %s" % agent) + wi = WorkItem(WorkItem.AGENT_ADDED, + {"agent": agent}) + self._work_q.put(wi) + else: + logging.warning("Ignoring message with unrecognized 'opcode' value: '%s'" + % msg.properties["opcode"]) + else: + logging.warning("Ignoring message with unrecognized 'method' value: '%s'" + % msg.properties["method"] ) + + + + + # called by run() thread ONLY + # + def _rcv_direct(self, msg): + """ + PRIVATE: Process a message sent to my direct receiver + """ + logging.info( "direct message received!" ) + if msg.correlation_id: + self._req_correlation.put_data(msg.correlation_id, msg) + + + + def enable_agent_discovery(self): + """ + Called to enable the asynchronous Agent Discovery process. + Once enabled, AGENT_ADD work items can arrive on the WorkQueue. + """ + if not self._agent_discovery: + self._agent_discovery = True + msg = Message(subject="qmf4", + properties={"method":"request", + "opcode":"agent-locate"}, + content={"query": {"vendor": "*", + "product": "*", + "name": "*"}}) + self._locate_sender.send(msg) + + + + def disable_agent_discovery(self): + """ + Called to disable the async Agent Discovery process enabled by + calling enable_agent_discovery() + """ + self._agent_discovery = False + + + + def get_workitem_count(self): + """ + Returns the count of pending WorkItems that can be retrieved. + """ + return self._work_q.qsize() + + + + def get_next_workitem(self, timeout=None): + """ + Returns the next pending work item, or None if none available. + @todo: subclass and return an Empty event instead. + """ + try: + wi = self._work_q.get(True, timeout) + except Queue.Empty: + return None + return wi + + + def release_workitem(self, wi): + """ + Return a WorkItem to the Console when it is no longer needed. + @todo: call Queue.task_done() - only 2.5+ + + @type wi: class qmfConsole.WorkItem + @param wi: work item object to return. + """ + pass + + + + # def get_packages(self): + # plist = [] + # for i in range(self.impl.packageCount()): + # plist.append(self.impl.getPackageName(i)) + # return plist + + + # def get_classes(self, package, kind=CLASS_OBJECT): + # clist = [] + # for i in range(self.impl.classCount(package)): + # key = self.impl.getClass(package, i) + # class_kind = self.impl.getClassKind(key) + # if class_kind == kind: + # if kind == CLASS_OBJECT: + # clist.append(SchemaObjectClass(None, None, {"impl":self.impl.getObjectClass(key)})) + # elif kind == CLASS_EVENT: + # clist.append(SchemaEventClass(None, None, {"impl":self.impl.getEventClass(key)})) + # return clist + + + # def bind_package(self, package): + # return self.impl.bindPackage(package) + + + # def bind_class(self, kwargs = {}): + # if "key" in kwargs: + # self.impl.bindClass(kwargs["key"]) + # elif "package" in kwargs: + # package = kwargs["package"] + # if "class" in kwargs: + # self.impl.bindClass(package, kwargs["class"]) + # else: + # self.impl.bindClass(package) + # else: + # raise Exception("Argument error: invalid arguments, use 'key' or 'package'[,'class']") + + + # def get_agents(self, broker=None): + # blist = [] + # if broker: + # blist.append(broker) + # else: + # self._cv.acquire() + # try: + # # copy while holding lock + # blist = self._broker_list[:] + # finally: + # self._cv.release() + + # agents = [] + # for b in blist: + # for idx in range(b.impl.agentCount()): + # agents.append(AgentProxy(b.impl.getAgent(idx), b)) + + # return agents + + + # def get_objects(self, query, kwargs = {}): + # timeout = 30 + # agent = None + # temp_args = kwargs.copy() + # if type(query) == type({}): + # temp_args.update(query) + + # if "_timeout" in temp_args: + # timeout = temp_args["_timeout"] + # temp_args.pop("_timeout") + + # if "_agent" in temp_args: + # agent = temp_args["_agent"] + # temp_args.pop("_agent") + + # if type(query) == type({}): + # query = Query(temp_args) + + # self._select = {} + # for k in temp_args.iterkeys(): + # if type(k) == str: + # self._select[k] = temp_args[k] + + # self._cv.acquire() + # try: + # self._sync_count = 1 + # self._sync_result = [] + # broker = self._broker_list[0] + # broker.send_query(query.impl, None, agent) + # self._cv.wait(timeout) + # if self._sync_count == 1: + # raise Exception("Timed out: waiting for query response") + # finally: + # self._cv.release() + + # return self._sync_result + + + # def get_object(self, query, kwargs = {}): + # ''' + # Return one and only one object or None. + # ''' + # objs = objects(query, kwargs) + # if len(objs) == 1: + # return objs[0] + # else: + # return None + + + # def first_object(self, query, kwargs = {}): + # ''' + # Return the first of potentially many objects. + # ''' + # objs = objects(query, kwargs) + # if objs: + # return objs[0] + # else: + # return None + + + # # Check the object against select to check for a match + # def _select_match(self, object): + # schema_props = object.properties() + # for key in self._select.iterkeys(): + # for prop in schema_props: + # if key == p[0].name() and self._select[key] != p[1]: + # return False + # return True + + + # def _get_result(self, list, context): + # ''' + # Called by Broker proxy to return the result of a query. + # ''' + # self._cv.acquire() + # try: + # for item in list: + # if self._select_match(item): + # self._sync_result.append(item) + # self._sync_count -= 1 + # self._cv.notify() + # finally: + # self._cv.release() + + + # def start_sync(self, query): pass + + + # def touch_sync(self, sync): pass + + + # def end_sync(self, sync): pass + + + + +# def start_console_events(self): +# self._cb_cond.acquire() +# try: +# self._cb_cond.notify() +# finally: +# self._cb_cond.release() + + +# def _do_console_events(self): +# ''' +# Called by the Console thread to poll for events. Passes the events +# onto the ConsoleHandler associated with this Console. Is called +# periodically, but can also be kicked by Console.start_console_events(). +# ''' +# count = 0 +# valid = self.impl.getEvent(self._event) +# while valid: +# count += 1 +# try: +# if self._event.kind == qmfengine.ConsoleEvent.AGENT_ADDED: +# logging.debug("Console Event AGENT_ADDED received") +# if self._handler: +# self._handler.agent_added(AgentProxy(self._event.agent, None)) +# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_DELETED: +# logging.debug("Console Event AGENT_DELETED received") +# if self._handler: +# self._handler.agent_deleted(AgentProxy(self._event.agent, None)) +# elif self._event.kind == qmfengine.ConsoleEvent.NEW_PACKAGE: +# logging.debug("Console Event NEW_PACKAGE received") +# if self._handler: +# self._handler.new_package(self._event.name) +# elif self._event.kind == qmfengine.ConsoleEvent.NEW_CLASS: +# logging.debug("Console Event NEW_CLASS received") +# if self._handler: +# self._handler.new_class(SchemaClassKey(self._event.classKey)) +# elif self._event.kind == qmfengine.ConsoleEvent.OBJECT_UPDATE: +# logging.debug("Console Event OBJECT_UPDATE received") +# if self._handler: +# self._handler.object_update(ConsoleObject(None, {"impl":self._event.object}), +# self._event.hasProps, self._event.hasStats) +# elif self._event.kind == qmfengine.ConsoleEvent.EVENT_RECEIVED: +# logging.debug("Console Event EVENT_RECEIVED received") +# elif self._event.kind == qmfengine.ConsoleEvent.AGENT_HEARTBEAT: +# logging.debug("Console Event AGENT_HEARTBEAT received") +# if self._handler: +# self._handler.agent_heartbeat(AgentProxy(self._event.agent, None), self._event.timestamp) +# elif self._event.kind == qmfengine.ConsoleEvent.METHOD_RESPONSE: +# logging.debug("Console Event METHOD_RESPONSE received") +# else: +# logging.debug("Console thread received unknown event: '%s'" % str(self._event.kind)) +# except e: +# print "Exception caught in callback thread:", e +# self.impl.popEvent() +# valid = self.impl.getEvent(self._event) +# return count + + + + + +# class Broker(ConnectionHandler): +# # attr_reader :impl :conn, :console, :broker_bank +# def __init__(self, console, conn): +# self.broker_bank = 1 +# self.console = console +# self.conn = conn +# self._session = None +# self._cv = Condition() +# self._stable = None +# self._event = qmfengine.BrokerEvent() +# self._xmtMessage = qmfengine.Message() +# self.impl = qmfengine.BrokerProxy(self.console.impl) +# self.console.impl.addConnection(self.impl, self) +# self.conn.add_conn_handler(self) +# self._operational = True + + +# def shutdown(self): +# logging.debug("broker.shutdown() called.") +# self.console.impl.delConnection(self.impl) +# self.conn.del_conn_handler(self) +# if self._session: +# self.impl.sessionClosed() +# logging.debug("broker.shutdown() sessionClosed done.") +# self._session.destroy() +# logging.debug("broker.shutdown() session destroy done.") +# self._session = None +# self._operational = False +# logging.debug("broker.shutdown() done.") + + +# def wait_for_stable(self, timeout = None): +# self._cv.acquire() +# try: +# if self._stable: +# return +# if timeout: +# self._cv.wait(timeout) +# if not self._stable: +# raise Exception("Timed out: waiting for broker connection to become stable") +# else: +# while not self._stable: +# self._cv.wait() +# finally: +# self._cv.release() + + +# def send_query(self, query, ctx, agent): +# agent_impl = None +# if agent: +# agent_impl = agent.impl +# self.impl.sendQuery(query, ctx, agent_impl) +# self.conn.kick() + + +# def _do_broker_events(self): +# count = 0 +# valid = self.impl.getEvent(self._event) +# while valid: +# count += 1 +# if self._event.kind == qmfengine.BrokerEvent.BROKER_INFO: +# logging.debug("Broker Event BROKER_INFO received"); +# elif self._event.kind == qmfengine.BrokerEvent.DECLARE_QUEUE: +# logging.debug("Broker Event DECLARE_QUEUE received"); +# self.conn.impl.declareQueue(self._session.handle, self._event.name) +# elif self._event.kind == qmfengine.BrokerEvent.DELETE_QUEUE: +# logging.debug("Broker Event DELETE_QUEUE received"); +# self.conn.impl.deleteQueue(self._session.handle, self._event.name) +# elif self._event.kind == qmfengine.BrokerEvent.BIND: +# logging.debug("Broker Event BIND received"); +# self.conn.impl.bind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) +# elif self._event.kind == qmfengine.BrokerEvent.UNBIND: +# logging.debug("Broker Event UNBIND received"); +# self.conn.impl.unbind(self._session.handle, self._event.exchange, self._event.name, self._event.bindingKey) +# elif self._event.kind == qmfengine.BrokerEvent.SETUP_COMPLETE: +# logging.debug("Broker Event SETUP_COMPLETE received"); +# self.impl.startProtocol() +# elif self._event.kind == qmfengine.BrokerEvent.STABLE: +# logging.debug("Broker Event STABLE received"); +# self._cv.acquire() +# try: +# self._stable = True +# self._cv.notify() +# finally: +# self._cv.release() +# elif self._event.kind == qmfengine.BrokerEvent.QUERY_COMPLETE: +# result = [] +# for idx in range(self._event.queryResponse.getObjectCount()): +# result.append(ConsoleObject(None, {"impl":self._event.queryResponse.getObject(idx), "broker":self})) +# self.console._get_result(result, self._event.context) +# elif self._event.kind == qmfengine.BrokerEvent.METHOD_RESPONSE: +# obj = self._event.context +# obj._method_result(MethodResponse(self._event.methodResponse())) + +# self.impl.popEvent() +# valid = self.impl.getEvent(self._event) + +# return count + + +# def _do_broker_messages(self): +# count = 0 +# valid = self.impl.getXmtMessage(self._xmtMessage) +# while valid: +# count += 1 +# logging.debug("Broker: sending msg on connection") +# self.conn.impl.sendMessage(self._session.handle, self._xmtMessage) +# self.impl.popXmt() +# valid = self.impl.getXmtMessage(self._xmtMessage) + +# return count + + +# def _do_events(self): +# while True: +# self.console.start_console_events() +# bcnt = self._do_broker_events() +# mcnt = self._do_broker_messages() +# if bcnt == 0 and mcnt == 0: +# break; + + +# def conn_event_connected(self): +# logging.debug("Broker: Connection event CONNECTED") +# self._session = Session(self.conn, "qmfc-%s.%d" % (socket.gethostname(), os.getpid()), self) +# self.impl.sessionOpened(self._session.handle) +# self._do_events() + + +# def conn_event_disconnected(self, error): +# logging.debug("Broker: Connection event DISCONNECTED") +# pass + + +# def conn_event_visit(self): +# self._do_events() + + +# def sess_event_session_closed(self, context, error): +# logging.debug("Broker: Session event CLOSED") +# self.impl.sessionClosed() + + +# def sess_event_recv(self, context, message): +# logging.debug("Broker: Session event MSG_RECV") +# if not self._operational: +# logging.warning("Unexpected session event message received by Broker proxy: context='%s'" % str(context)) +# self.impl.handleRcvMessage(message) +# self._do_events() + + +
Added: qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py URL: http://svn.apache.org/viewvc/qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py?rev=889416&view=auto ============================================================================== --- qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py (added) +++ qpid/branches/qmfv2/qpid/python/qmf/test/agent_test.py Thu Dec 10 20:32:21 2009 @@ -0,0 +1,58 @@ +import logging +import time + +from qpid.messaging import * +from qmfCommon import (AgentId, SchemaEventClassFactory, qmfTypes, SchemaProperty, + SchemaObjectClass, ObjectIdFactory, QmfData, QmfDescribed, + QmfDescribedFactory, QmfManaged, QmfManagedFactory, QmfDataFactory, + QmfEvent, SchemaMethod) +from qmfAgent import (Agent, QmfAgentData) + + +class MyAgent(object): + def main(self): + + self._agent = Agent( "redhat.com", "qmf", "testAgent" ) + + # Dynamically construct a class schema + + _schema = SchemaObjectClass( "MyPackage", "MyClass", + desc="A test data schema", + _pkey=["index1", "index2"] ) + # add properties + _schema.addProperty( "index1", + SchemaProperty(qmfTypes.TYPE_UINT8)) + _schema.addProperty( "index2", + SchemaProperty(qmfTypes.TYPE_LSTR)) + + # add method + _meth = SchemaMethod( _desc="A test method" ) + _meth.addArgument( "arg1", SchemaProperty(qmfTypes.TYPE_UINT32) ) + _meth.addArgument( "arg2", SchemaProperty(qmfTypes.TYPE_LSTR) ) + _meth.addArgument( "arg3", SchemaProperty(qmfTypes.TYPE_BOOL) ) + + _schema.addMethod( "meth_3", _meth ) + + # Add schema to Agent + + self._agent.registerObjectClass(_schema) + + # instantiate managed data objects matching the schema + + obj = QmfAgentData( self._agent, _schema ) + obj.setProperty("index1", 100) + obj.setProperty("index2", "a name" ) + + + self._agent.addObject( QmfAgentData( self._agent, _schema, + _props={"index1":99, + "index2": "another name"} )) + + + return None + + + + +app = MyAgent() +print( "s='%s'", str(app.main())) --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
