Author: tross
Date: Tue Jun 12 19:06:13 2012
New Revision: 1349476

URL: http://svn.apache.org/viewvc?rev=1349476&view=rev
Log:
QPID-4059 - qpid-printevents refactored to use the lighter-weight management 
library

Modified:
    qpid/trunk/qpid/tools/src/py/qpid-printevents
    qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py

Modified: qpid/trunk/qpid/tools/src/py/qpid-printevents
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-printevents?rev=1349476&r1=1349475&r2=1349476&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-printevents (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-printevents Tue Jun 12 19:06:13 2012
@@ -21,34 +21,85 @@
 
 import os
 import optparse
-from optparse import IndentedHelpFormatter
 import sys
-import socket
-from time import time, strftime, gmtime, sleep
-from qmf.console import Console, Session
+from optparse       import IndentedHelpFormatter
+from time           import time, strftime, gmtime, sleep
+from threading      import Lock, Condition, Thread
+from qpid.messaging import Connection
+import qpid.messaging.exceptions
+
+home = os.environ.get("QPID_TOOLS_HOME", 
os.path.normpath("/usr/share/qpid-tools"))
+sys.path.append(os.path.join(home, "python"))
+
+from qpidtoollibs.broker import EventHelper
+
+
+class Printer(object):
+  """
+  This class serializes printed lines so that events coming from different
+  threads don't overlap each other.
+  """
+  def __init__(self):
+    self.lock = Lock()
 
-
-class EventConsole(Console):
-  def event(self, broker, event):
-    print event
-    sys.stdout.flush()
-
-  def brokerConnected(self, broker):
-    print strftime("%c", gmtime(time())), "NOTIC 
qpid-printevents:brokerConnected broker=%s" % broker.getUrl()
+  def pr(self, text):
+    self.lock.acquire()
+    try:
+      print text
+    finally:
+      self.lock.release()
     sys.stdout.flush()
+  
 
-  def brokerConnectionFailed(self, broker):
-    print strftime("%c", gmtime(time())), "NOTIC 
qpid-printevents:brokerConnectionFailed broker=%s %s" % (broker.getUrl(), 
str(broker.conn_exc))
-    sys.stdout.flush()
+class EventReceiver(Thread):
+  """
+  One instance of this class is created for each broker that is being 
monitored.
+  This class does not use the "reconnect" option because it needs to report as
+  events when the connection is established and when it's lost.
+  """
+  def __init__(self, printer, url, mechanism, options):
+    Thread.__init__(self)
+    self.printer   = printer
+    self.url       = url
+    self.mechanism = mechanism
+    self.options   = options
+    self.running   = True
+    self.helper    = EventHelper()
+
+  def cancel(self):
+    self.running = False
+
+  def run(self):
+    isOpen = False
+    while self.running:
+      try:
+        conn = Connection.establish(self.url, sasl_mechanisms=self.mechanism, 
client_properties=self.options)
+        isOpen = True
+        self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC 
qpid-printevents:brokerConnected broker=%s" % self.url)
+
+        sess = conn.session()
+        rx = sess.receiver(self.helper.eventAddress())
+
+        while self.running:
+          try:
+            msg = rx.fetch(1)
+            event = self.helper.event(msg)
+            self.printer.pr(event.__repr__())
+            sess.acknowledge()
+          except qpid.messaging.exceptions.Empty:
+            pass
+        
+      except Exception, e:
+        if isOpen:
+          self.printer.pr(strftime("%c", gmtime(time())) + " NOTIC 
qpid-printevents:brokerDisconnected broker=%s" % self.url)
+        isOpen = False
+        sleep(1)
 
-  def brokerDisconnected(self, broker):
-    print strftime("%c", gmtime(time())), "NOTIC 
qpid-printevents:brokerDisconnected broker=%s" % broker.getUrl()
-    sys.stdout.flush()
 
 class JHelpFormatter(IndentedHelpFormatter):
-    """Format usage and description without stripping newlines from usage 
strings
     """
-
+    Format usage and description without stripping newlines from usage strings
+    """
     def format_usage(self, usage):
         return usage
 
@@ -87,16 +138,23 @@ def main(argv=None):
   if len(arguments) == 0:
     arguments.append("localhost")
 
-  console = EventConsole()
-  session = Session(console, rcvObjects=False, 
rcvHeartbeats=options.heartbeats, manageConnections=True)
-  brokers = []
+  brokers   = []
+  mechanism = options.sasl_mechanism
+  props     = {'qpid.ha-admin' : 1}
+  printer   = Printer()
+
+  if options.heartbeats:
+    props['heartbeat'] = 5
+
   try:
     try:
       for host in arguments:
-        brokers.append(session.addBroker(host, None, options.sasl_mechanism))
+        er = EventReceiver(printer, host, mechanism, props)
+        brokers.append(er)
+        er.start()
 
-        while (True):
-          sleep(10)
+      while (True):
+        sleep(10)
 
     except KeyboardInterrupt:
         print
@@ -106,9 +164,10 @@ def main(argv=None):
         print "Failed: %s - %s" % (e.__class__.__name__, e)
         return 1
   finally:
-    while len(brokers):
-      b = brokers.pop()
-      session.delBroker(b)
+    for b in brokers:
+      b.cancel()
+    for b in brokers:
+      b.join()
 
 if __name__ == '__main__':
   sys.exit(main())

Modified: qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py?rev=1349476&r1=1349475&r2=1349476&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py (original)
+++ qpid/trunk/qpid/tools/src/py/qpidtoollibs/broker.py Tue Jun 12 19:06:13 2012
@@ -18,6 +18,7 @@
 #
 
 from qpid.messaging import Message
+from qpidtoollibs.disp import TimeLong
 try:
   from uuid import uuid4
 except ImportError:
@@ -295,6 +296,41 @@ class BrokerAgent(object):
     return self._getBrokerObject(self, _type, oid)
 
 
+class EventHelper(object):
+  def eventAddress(self, pkg='*', cls='*', sev='*'):
+    return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', 
'_'), cls, sev)
+
+  def event(self, msg):
+    return BrokerEvent(msg)
+
+
+class BrokerEvent(object):
+  def __init__(self, msg):
+    self.msg = msg
+    self.content = msg.content[0]
+    self.values = self.content['_values']
+    self.schema_id = self.content['_schema_id']
+    self.name = "%s:%s" % (self.schema_id['_package_name'], 
self.schema_id['_class_name'])
+
+  def __repr__(self):
+    rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name)
+    for k,v in self.values.items():
+      rep = rep + " %s=%s" % (k, v)
+    return rep
+
+  def __getattr__(self, key):
+    if key not in self.values:
+      return None
+    value = self.values[key]
+    return value
+
+  def getAttributes(self):
+    return self.values
+
+  def getTimestamp(self):
+    return self.content['_timestamp']
+
+
 class BrokerObject(object):
   def __init__(self, broker, content):
     self.broker = broker
@@ -362,7 +398,7 @@ class Connection(BrokerObject):
     BrokerObject.__init__(self, broker, values)
 
   def close(self):
-    pass
+    self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % 
self.address)
 
 class Session(BrokerObject):
   def __init__(self, broker, values):



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to