Author: tross
Date: Fri Jun 26 12:57:43 2009
New Revision: 788681
URL: http://svn.apache.org/viewvc?rev=788681&view=rev
Log:
Added --timeout options to cli tools.
Cli tools will not hang indefinitely if the broker is non-responsive.
Modified:
qpid/trunk/qpid/python/commands/qpid-cluster
qpid/trunk/qpid/python/commands/qpid-config
qpid/trunk/qpid/python/commands/qpid-route
qpid/trunk/qpid/python/commands/qpid-stat
qpid/trunk/qpid/python/commands/qpid-tool
qpid/trunk/qpid/python/qmf/console.py
qpid/trunk/qpid/python/qpid/managementdata.py
Modified: qpid/trunk/qpid/python/commands/qpid-cluster
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=788681&r1=788680&r2=788681&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-cluster (original)
+++ qpid/trunk/qpid/python/commands/qpid-cluster Fri Jun 26 12:57:43 2009
@@ -28,6 +28,7 @@
from qmf.console import Session
_host = "localhost"
+_connTimeout = 10
_stopId = None
_stopAll = False
_force = False
@@ -42,6 +43,7 @@
print " ex: localhost, 10.1.1.7:10000, broker-host:10000,
guest/gu...@localhost"
print
print "Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker
connection"
print " -C [--all-connections] View client connections to all
cluster members"
print " -c [--connections] ID View client connections to
specified member"
print " -d [--del-connection] HOST:PORT"
@@ -88,7 +90,7 @@
def SetBroker(self, brokerUrl):
self.url = brokerUrl
self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl)
+ self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == 0:
@@ -200,7 +202,7 @@
idx = 0
for host in hostList:
if _showConn == "all" or _showConn == idList[idx] or _delConn:
- self.brokers.append(self.qmf.addBroker(host))
+ self.brokers.append(self.qmf.addBroker(host, _connTimeout))
displayList.append(idList[idx])
idx += 1
@@ -247,7 +249,7 @@
##
try:
- longOpts = ("stop=", "all-stop", "force", "connections=",
"all-connections" "del-connection=", "numeric")
+ longOpts = ("stop=", "all-stop", "force", "connections=",
"all-connections" "del-connection=", "numeric", "timeout=")
(optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "s:kfCc:d:n",
longOpts)
except:
Usage()
@@ -260,6 +262,10 @@
count = 0
for opt in optlist:
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
if opt[0] == "-s" or opt[0] == "--stop":
_stopId = opt[1]
if len(_stopId.split(":")) != 2:
@@ -316,7 +322,7 @@
if e.__repr__().find("connection aborted") > 0:
# we expect this when asking the connected broker to shut down
sys.exit(0)
- print "Failed:", e.args
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
sys.exit(1)
bm.Disconnect()
Modified: qpid/trunk/qpid/python/commands/qpid-config
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-config?rev=788681&r1=788680&r2=788681&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-config (original)
+++ qpid/trunk/qpid/python/commands/qpid-config Fri Jun 26 12:57:43 2009
@@ -27,6 +27,7 @@
_recursive = False
_host = "localhost"
+_connTimeout = 10
_altern_ex = None
_passive = False
_durable = False
@@ -67,6 +68,7 @@
print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name>
[binding-key]"
print
print "Options:"
+ print " --timeout seconds (10) Maximum time to wait
for broker connection"
print " -b [ --bindings ] Show bindings in
queue or exchange list"
print " -a [ --broker-addr ] Address (localhost) Address of qpidd
broker"
print " broker-addr is in the form: [username/passw...@]
hostname | ip-address [:<port>]"
@@ -135,7 +137,7 @@
def SetBroker (self, brokerUrl):
self.url = brokerUrl
self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl)
+ self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == 0:
@@ -371,7 +373,7 @@
longOpts = ("durable", "cluster-durable", "bindings", "broker-addr=",
"file-count=",
"file-size=", "max-queue-size=", "max-queue-count=",
"limit-policy=",
"order=", "sequence", "ive", "generate-queue-events=",
"force", "force-if-not-empty",
- "force_if_used", "alternate-exchange=", "passive")
+ "force_if_used", "alternate-exchange=", "passive", "timeout=")
(optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "a:b", longOpts)
except:
Usage ()
@@ -387,6 +389,10 @@
_recursive = True
if opt[0] == "-a" or opt[0] == "--broker-addr":
_host = opt[1]
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
if opt[0] == "--alternate-exchange":
_altern_ex = opt[1]
if opt[0] == "--passive":
Modified: qpid/trunk/qpid/python/commands/qpid-route
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-route?rev=788681&r1=788680&r2=788681&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-route (original)
+++ qpid/trunk/qpid/python/commands/qpid-route Fri Jun 26 12:57:43 2009
@@ -43,6 +43,7 @@
print " qpid-route [OPTIONS] link list [<dest-broker>]"
print
print "Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker
connection"
print " -v [ --verbose ] Verbose output"
print " -q [ --quiet ] Quiet output, don't print duplicate
warnings"
print " -d [ --durable ] Added configuration shall be durable"
@@ -64,13 +65,14 @@
_srclocal = False
_transport = "tcp"
_ack = 0
+_connTimeout = 10
class RouteManager:
def __init__(self, localBroker):
self.local = BrokerURL(localBroker)
self.remote = None
self.qmf = Session()
- self.broker = self.qmf.addBroker(localBroker)
+ self.broker = self.qmf.addBroker(localBroker, _connTimeout)
def disconnect(self):
self.qmf.delBroker(self.broker)
@@ -143,7 +145,7 @@
if url.name() not in brokerList:
print " %s..." % url.name(),
try:
- b = qmf.addBroker("%s:%d" % (link.host, link.port))
+ b = qmf.addBroker("%s:%d" % (link.host, link.port),
_connTimeout)
brokerList[url.name()] = b
added = True
print "Ok"
@@ -403,7 +405,7 @@
##
try:
- longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local",
"transport=", "ack=")
+ longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local",
"transport=", "ack=", "timeout=")
(optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts)
except:
Usage()
@@ -415,6 +417,10 @@
cargs = encArgs
for opt in optlist:
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
if opt[0] == "-v" or opt[0] == "--verbose":
_verbose = True
if opt[0] == "-q" or opt[0] == "--quiet":
@@ -512,7 +518,7 @@
Usage()
except Exception,e:
- print "Failed:", e.args
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
sys.exit(1)
rm.disconnect()
Modified: qpid/trunk/qpid/python/commands/qpid-stat
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-stat?rev=788681&r1=788680&r2=788681&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-stat (original)
+++ qpid/trunk/qpid/python/commands/qpid-stat Fri Jun 26 12:57:43 2009
@@ -29,7 +29,7 @@
from qpid.disp import Display, Header, Sorter
_host = "localhost"
-_top = False
+_connTimeout = 10
_types = ""
_limit = 50
_increasing = False
@@ -42,10 +42,10 @@
print " broker-addr is in the form: [username/passw...@]
hostname | ip-address [:<port>]"
print " ex: localhost, 10.1.1.7:10000, broker-host:10000,
guest/gu...@localhost"
print
-# print "General Options:"
+ print "General Options:"
+ print " --timeout seconds (10) Maximum time to wait for broker
connection"
# print " -n [--numeric] Don't resolve names"
-# print " -t [--top] Repeatedly display top items"
-# print
+ print
print "Display Options:"
print
print " -b Show Brokers"
@@ -144,7 +144,7 @@
def SetBroker(self, brokerUrl):
self.url = brokerUrl
self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl)
+ self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
agents = self.qmf.getAgents()
for a in agents:
if a.getAgentBank() == 0:
@@ -389,7 +389,7 @@
self.qmf.delBroker(self.broker)
self.broker = None
for host in hostList:
- b = self.qmf.addBroker(host)
+ b = self.qmf.addBroker(host, _connTimeout)
self.brokers.append(Broker(self.qmf, b))
else:
self.brokers.append(Broker(self.qmf, self.broker))
@@ -402,7 +402,7 @@
##
try:
- longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing")
+ longOpts = ("top", "numeric", "sort-by=", "limit=", "increasing",
"timeout=")
(optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bceqS:L:I", longOpts)
except:
Usage()
@@ -414,8 +414,10 @@
cargs = encArgs
for opt in optlist:
- if opt[0] == "-t" or opt[0] == "--top":
- _top = True
+ if opt[0] == "--timeout":
+ _connTimeout = int(opt[1])
+ if _connTimeout == 0:
+ _connTimeout = None
elif opt[0] == "-n" or opt[0] == "--numeric":
_numeric = True
elif opt[0] == "-S" or opt[0] == "--sort-by":
@@ -448,8 +450,7 @@
except KeyboardInterrupt:
print
except Exception,e:
- print "Failed:", e.args
- #raise # TODO: Remove before flight
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
sys.exit(1)
bm.Disconnect()
Modified: qpid/trunk/qpid/python/commands/qpid-tool
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-tool?rev=788681&r1=788680&r2=788681&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-tool (original)
+++ qpid/trunk/qpid/python/commands/qpid-tool Fri Jun 26 12:57:43 2009
@@ -24,7 +24,7 @@
import sys
import socket
from cmd import Cmd
-from qpid.connection import ConnectionFailed
+from qpid.connection import ConnectionFailed, Timeout
from qpid.managementdata import ManagementData
from shlex import split
from qpid.disp import Display
@@ -183,6 +183,8 @@
except Exception, e:
if str(e).find ("Exchange not found") != -1:
print "Management not enabled on broker: Use '-m yes' option on broker
startup."
+ else:
+ print "Failed: %s - %s" % (e.__class__.__name__, e)
sys.exit(1)
# Instantiate the CLI interpreter and launch it.
Modified: qpid/trunk/qpid/python/qmf/console.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qmf/console.py?rev=788681&r1=788680&r2=788681&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qmf/console.py (original)
+++ qpid/trunk/qpid/python/qmf/console.py Fri Jun 26 12:57:43 2009
@@ -469,11 +469,11 @@
def __repr__(self):
return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
- def addBroker(self, target="localhost"):
+ def addBroker(self, target="localhost", timeout=None):
""" Connect to a Qpid broker. Returns an object of type Broker. """
url = BrokerURL(target)
broker = Broker(self, url.host, url.port, url.authMech, url.authName,
url.authPass,
- ssl = url.scheme == URL.AMQPS)
+ ssl = url.scheme == URL.AMQPS, connTimeout=timeout)
self.brokers.append(broker)
if not self.manageConnections:
@@ -1551,11 +1551,12 @@
SYNC_TIME = 60
nextSeq = 1
- def __init__(self, session, host, port, authMech, authUser, authPass,
ssl=False):
+ def __init__(self, session, host, port, authMech, authUser, authPass,
ssl=False, connTimeout=None):
self.session = session
self.host = host
self.port = port
self.ssl = ssl
+ self.connTimeout = connTimeout
self.authUser = authUser
self.authPass = authPass
self.cv = Condition()
@@ -1641,13 +1642,21 @@
sock = connect(self.host, self.port)
sock.settimeout(5)
+ oldTimeout = sock.gettimeout()
+ sock.settimeout(self.connTimeout)
if self.ssl:
- sock = ssl(sock)
- self.conn = Connection(sock, username=self.authUser,
password=self.authPass, heartbeat=2)
+ connSock = ssl(sock)
+ else:
+ connSock = sock
+ self.conn = Connection(connSock, username=self.authUser,
password=self.authPass)
def aborted():
- raise Timeout("read timed out")
+ raise Timeout("Waiting for connection to be established with broker")
+ oldAborted = self.conn.aborted
self.conn.aborted = aborted
self.conn.start()
+ sock.settimeout(oldTimeout)
+ self.conn.aborted = oldAborted
+
self.replyName = "reply-%s" % self.amqpSessionId
self.amqpSession = self.conn.session(self.amqpSessionId)
self.amqpSession.auto_sync = True
@@ -1681,13 +1690,13 @@
self._send(msg)
except socket.error, e:
- self.error = "Socket Error %s - %s" % (e[0], e[1])
+ self.error = "Socket Error %s - %s" % (e.__class__.__name__, e)
raise
except Closed, e:
- self.error = "Connect Failed %d - %s" % (e[0], e[1])
+ self.error = "Connect Failed %d - %s" % (e.__class__.__name__, e)
raise
except ConnectionFailed, e:
- self.error = "Connect Failed %d - %s" % (e[0], e[1])
+ self.error = "Connect Failed %d - %s" % (e.__class__.__name__, e)
raise
def _updateAgent(self, obj):
Modified: qpid/trunk/qpid/python/qpid/managementdata.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/managementdata.py?rev=788681&r1=788680&r2=788681&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/managementdata.py (original)
+++ qpid/trunk/qpid/python/qpid/managementdata.py Fri Jun 26 12:57:43 2009
@@ -31,6 +31,7 @@
import os
import platform
import locale
+from qpid.connection import Timeout
from qpid.management import managementChannel, managementClient
from threading import Lock
from disp import Display
@@ -206,11 +207,22 @@
self.sessionId = "%s.%d" % (platform.uname()[1], os.getpid())
self.broker = Broker (host)
- self.conn = Connection (connect (self.broker.host, self.broker.port),
+ sock = connect (self.broker.host, self.broker.port)
+ oldTimeout = sock.gettimeout()
+ sock.settimeout(10)
+ self.conn = Connection (sock,
username=self.broker.username,
password=self.broker.password)
self.spec = self.conn.spec
+ def aborted():
+ raise Timeout("Waiting for connection to be established with broker")
+ oldAborted = self.conn.aborted
+ self.conn.aborted = aborted
+
self.conn.start ()
+ sock.settimeout(oldTimeout)
+ self.conn.aborted = oldAborted
+
self.mclient = managementClient (self.spec, self.ctrlHandler,
self.configHandler,
self.instHandler, self.methodReply,
self.closeHandler)
self.mclient.schemaListener (self.schemaHandler)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]