Author: aconway
Date: Thu Apr 24 17:54:05 2014
New Revision: 1589807

URL: http://svn.apache.org/r1589807
Log:
QPID-5719: HA becomes unresponsive once any of the brokers are SIGSTOPed

- Added timeout to qpid-ha.
- qpidd init script pings broker to verify it is not hung.
- updated documentation in 
qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml.

The new results for the cases mentioned in the bug:

a] stopped ALL brokers: rgmanager restarts the entire cluster but data is lost.
   Equivalent to killing all the  brokers at once. This does not affect quorum 
because
   only qpidd services are affected, not other services managed by cman.

b] stopped the primary: rgmanager restarts the primary after a timeout and 
promotes one of the backups.

c] stopped a backup: rgmanager restarts the backups after a timeout.
   Clients that are actively sending messages may see a delay while backup is 
restarted.

Note you need to set link-heartbeat-interval in qpidd.conf. The default is very
high (120 seconds), it should be set lower to see recovery from sigstop in a
reasonable time.
See the updated documentation in 
qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml.

Modified:
    qpid/trunk/qpid/cpp/etc/qpidd-primary.in
    qpid/trunk/qpid/cpp/etc/qpidd.in
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py
    qpid/trunk/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
    qpid/trunk/qpid/tools/src/py/qpid-ha

Modified: qpid/trunk/qpid/cpp/etc/qpidd-primary.in
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/etc/qpidd-primary.in?rev=1589807&r1=1589806&r2=1589807&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/etc/qpidd-primary.in (original)
+++ qpid/trunk/qpid/cpp/etc/qpidd-primary.in Thu Apr 24 17:54:05 2014
@@ -45,6 +45,7 @@ QPID_HA_OPTIONS="--config $QPID_CONFIG"
 
 # Source configuration
 test -f @sysconfdir@/sysconfig/$prog && source @sysconfdir@/sysconfig/$prog
+source /etc/rc.d/init.d/functions
 
 # Check presence of executables/scripts
 for f in $QPID_INIT $QPID_HA; do
@@ -53,8 +54,6 @@ done
 
 QPID_HA="$QPID_HA $QPID_HA_OPTIONS"
 
-source /etc/rc.d/init.d/functions
-
 RETVAL=0
 
 status() {

Modified: qpid/trunk/qpid/cpp/etc/qpidd.in
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/etc/qpidd.in?rev=1589807&r1=1589806&r2=1589807&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/etc/qpidd.in (original)
+++ qpid/trunk/qpid/cpp/etc/qpidd.in Thu Apr 24 17:54:05 2014
@@ -41,32 +41,36 @@ pidfile=/var/run/qpidd.pid
 
 # The following variables can be overridden in @sysconfdir@/sysconfig/$prog
 QPID_BIN=@sbindir@/$prog
-QPID_CONFIG=@confdir@/qpidd.conf
 QPID_DATA_DIR=/var/lib/qpidd
+QPID_CONFIG=@confdir@/qpidd.conf
+QPID_HA=@bindir@/qpid-ha
+QPID_HA_OPTIONS="--config $QPID_CONFIG"
 
 # Source configuration
-if [ -f @sysconfdir@/sysconfig/$prog ] ; then
-       . @sysconfdir@/sysconfig/$prog
-fi
+test -f @sysconfdir@/sysconfig/$prog && source @sysconfdir@/sysconfig/$prog
+source /etc/rc.d/init.d/functions
 
-# Source function library.
-. /etc/rc.d/init.d/functions
+# Check presence of executables/scripts
+for f in $QPID_BIN $QPID_HA; do
+    test -x $f || { echo "$f not found or not executable"; exit 5; }
+done
 
-RETVAL=0
+QPID_HA="$QPID_HA $QPID_HA_OPTIONS"
 
-#ensure binary is present and executable
-if [[ !(-x @sbindir@/$prog) ]] ; then
-    echo "@sbindir@/$prog not found or not executable"
-    exit 5
-fi
+RETVAL=0
 
-#ensure user has sufficient permissions
+# Ensure user has sufficient permissions
 runuser -s /bin/sh qpidd -c "echo x > /dev/null" 2> /dev/null || RETVAL=4
 if [ $RETVAL = 4 ]; then
     echo "user had insufficient privilege";
     exit $RETVAL
 fi
 
+do_status() {
+    # Check PID file and ping for liveness
+    status $prog && $QPID_HA ping
+}
+
 start() {
         echo -n $"Starting Qpid AMQP daemon: "
        daemon --pidfile $pidfile --check $prog --user qpidd $QPID_BIN --config 
$QPID_CONFIG --data-dir $QPID_DATA_DIR --daemon $QPIDD_OPTIONS
@@ -77,7 +81,7 @@ start() {
            touch $pidfile
            chown qpidd.qpidd $pidfile
             [ -x /sbin/restorecon ] && /sbin/restorecon $pidfile
-           runuser - -s /bin/sh qpidd -c "$QPID_BIN --check > $pidfile"
+           runuser - -s /bin/sh qpidd -c "$QPID_BIN --config $QPID_CONFIG 
--check > $pidfile"
        fi
        return $RETVAL
 }
@@ -106,7 +110,7 @@ case "$1" in
        $1
        ;;
   status)
-       status $prog
+       do_status
        RETVAL=$?
        ;;
   force-reload)

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1589807&r1=1589806&r2=1589807&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Thu Apr 24 17:54:05 2014
@@ -131,12 +131,14 @@ class HaBroker(Broker):
                  "--link-maintenance-interval=0.1",
                  # Heartbeat and negotiate time are needed so that a broker 
wont
                  # stall on an address that doesn't currently have a broker 
running.
-                 "--link-heartbeat-interval=%s"%(HaBroker.heartbeat),
                  "--max-negotiate-time=1000",
                  "--ha-cluster=%s"%ha_cluster]
         # Add default --log-enable arguments unless args already has --log 
arguments.
         if not [l for l in args if l.startswith("--log")]:
             args += ["--log-enable=info+", "--log-enable=debug+:ha::"]
+        if not [h for h in args if h.startswith("--link-heartbeat-interval")]:
+            args += ["--link-heartbeat-interval=%s"%(HaBroker.heartbeat)]
+
         if ha_replicate is not None:
             args += [ "--ha-replicate=%s"%ha_replicate ]
         if brokers_url: args += [ "--ha-brokers-url", brokers_url ]

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1589807&r1=1589806&r2=1589807&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Apr 24 17:54:05 2014
@@ -1213,6 +1213,14 @@ class RecoveryTests(HaBrokerTest):
         cluster.bounce(0, promote_next=False)
         cluster[0].promote()
 
+    def test_stalled_backup(self):
+        """Make sure that a stalled backup broker does not stall the primary"""
+        # FIXME aconway 2014-04-15: merge with test_join_ready_cluster?
+        cluster = HaCluster(self, 3, args=["--link-heartbeat-interval=1"])
+        os.kill(cluster[1].pid, signal.SIGSTOP)
+        s = cluster[0].connect().session()
+        s.sender("q;{create:always}").send("x")
+        self.assertEqual("x", s.receiver("q").fetch(0).content)
 
 class ConfigurationTests(HaBrokerTest):
     """Tests for configuration settings."""

Modified: qpid/trunk/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml?rev=1589807&r1=1589806&r2=1589807&view=diff
==============================================================================
--- qpid/trunk/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml 
(original)
+++ qpid/trunk/qpid/doc/book/src/cpp-broker/Active-Passive-Cluster.xml Thu Apr 
24 17:54:05 2014
@@ -335,9 +335,9 @@ ssl_addr = "ssl:" host [":" port]'
            </entry>
            <entry>
              <para>
-               Interval for the broker to check link health and re-connect 
links if need
-               be. If you want brokers to fail over quickly you can set this 
to a
-               fraction of a second, for example: 0.1.
+               Interval for backup brokers to check the link to the primary 
re-connect if need be.
+               Default 2 seconds. Can be set lower for faster failover, e.g. 
0.1 seconds.
+               Setting it too low will result in excessive link-checking 
activity on the brokers.
              </para>
            </entry>
          </row>
@@ -348,8 +348,12 @@ ssl_addr = "ssl:" host [":" port]'
            </entry>
            <entry>
              <para>
-               Heartbeat interval for replication links. The link will be 
assumed broken
-               if there is no heartbeat for twice the interval.
+               Heartbeat interval for replication links and timeout for broker 
status checks.
+               It may take up to this interval for rgmanager to detect a hung 
or partitioned broker.
+               The primary may take up to twice this interval to detect a hung 
or partitioned backup.
+               Clients sending messages may be held up during this time.
+               Default 120 seconds: you will probably want to set this to a 
lower value e.g. 10.
+               If set too low, a slow broker may be considered as failed and 
killed.
              </para>
            </entry>
          </row>
@@ -430,8 +434,13 @@ NOTE: fencing is not shown, you must con
     <clusternode name="node2.example.com" nodeid="2"/>
     <clusternode name="node3.example.com" nodeid="3"/>
   </clusternodes>
+
   <!-- Resouce Manager configuration. -->
-  <rm>
+
+   status_poll_interval is the interval in seconds that the resource manager 
checks the status
+   of managed services. This affects how quickly the manager will detect 
failed services.
+   -->
+  <rm status_poll_interval="1">
     <!--
        There is a failoverdomain for each node containing just that node.
        This lets us stipulate that the qpidd service should always run on each 
node.
@@ -455,8 +464,12 @@ NOTE: fencing is not shown, you must con
       <!-- This script promotes the qpidd broker on this node to primary. -->
       <script file="/etc/init.d/qpidd-primary" name="qpidd-primary"/>
 
-      <!-- This is a virtual IP address for client traffic. -->
-      <ip address="20.0.20.200" monitor_link="1"/>
+      <!--
+          This is a virtual IP address for client traffic.
+         monitor_link="yes" means monitor the health of the NIC used for the 
VIP.
+         sleeptime="0" means don't delay when failing over the VIP to a new 
address.
+      -->
+      <ip address="20.0.20.200" monitor_link="yes" sleeptime="0"/>
     </resources>
 
     <!-- There is a qpidd service on each node, it should be restarted if it 
fails. -->

Modified: qpid/trunk/qpid/tools/src/py/qpid-ha
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-ha?rev=1589807&r1=1589806&r2=1589807&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-ha (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-ha Thu Apr 24 17:54:05 2014
@@ -22,6 +22,7 @@
 import optparse, sys, time, os, re
 from qpid.messaging import Connection
 from qpid.messaging import Message as QpidMessage
+from qpid.util import URL
 from qpidtoollibs.broker import BrokerAgent
 from qpidtoollibs.config import parse_qpidd_conf
 try:
@@ -31,6 +32,10 @@ except ImportError:
 
 # QMF address for the HA broker object.
 HA_BROKER = "org.apache.qpid.ha:habroker:ha-broker"
+# Define these defaults here rather than in add_option because we want
+# to use qpidd.conf for defaults if --config is specified and
+# these defaults otherwise:
+DEFAULTS = { "broker":"localhost", "timeout":10.0}
 
 class ExitStatus(Exception):
     """Raised if a command want's a non-0 exit status from the script"""
@@ -40,31 +45,41 @@ class Command:
     commands = {}
 
     def add(self, optname, metavar, type, help):
-        self.op.add_option(optname, metavar=metavar, type=type, help=help, 
action="store")
+        self.op.add_option(optname, metavar=metavar, type=type, help=help)
 
-    def __init__(self, name, help, arg_names=[]):
+    def __init__(self, name, help, arg_names=[], connect_agent=True):
+        """@param connect_agent true if we should establish a QMF agent 
connection"""
         Command.commands[name] = self
         self.name = name
+        self.connect_agent = connect_agent
         self.arg_names = arg_names
         usage="%s [options] %s\n\n%s"%(name, " ".join(arg_names), help)
         self.help = help
         self.op=optparse.OptionParser(usage)
-        self.op.add_option("-b", "--broker", action="store", type="string", 
default="localhost:5672", metavar="<address>", help="Address of qpidd broker 
with syntax: [username/password@] hostname | ip-address [:<port>]")
-        self.op.add_option("--sasl-mechanism", metavar="<mech>", help="SASL 
mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, 
DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available 
mechanism - use this option to override.")
-        self.op.add_option("--ssl-certificate", action="store", type="string", 
metavar="<cert>", help="Client SSL certificate (PEM Format)")
-        self.op.add_option("--ssl-key", action="store", type="string", 
metavar="<key>", help="Client SSL private key (PEM Format)")
+        def help_default(what): return " (Default %s)"%DEFAULTS[what]
+        self.op.add_option("-b", "--broker", metavar="<address>", 
help="Address of qpidd broker with syntax: [username/password@] hostname | 
ip-address [:<port>]"+help_default("broker"))
+        self.op.add_option("--timeout", type="float", metavar="<seconds>", 
help="Give up if the broker does not respond within the timeout. 0 means wait 
forever"+help_default("timeout"))
+        self.op.add_option("--sasl-mechanism", metavar="<mech>", help="SASL 
mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD5, 
DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available 
mechanism - use this option to override")
+        self.op.add_option("--ssl-certificate", metavar="<cert>", help="Client 
SSL certificate (PEM Format)")
+        self.op.add_option("--ssl-key", metavar="<key>", help="Client SSL 
private key (PEM Format)")
         self.op.add_option("--config", metavar="<path/to/qpidd.conf>", 
help="Connect to the local qpidd by reading its configuration file.")
 
     def connect(self, opts):
         conn_options = {}
-        if opts.config:         # Use broker config file.
+        if not opts.broker: opts.broker = DEFAULTS["broker"]
+        url = URL(opts.broker)
+        if opts.config:         # Use broker config file for defaults
             config = parse_qpidd_conf(opts.config)
-            def joinif(separator, items): return separator.join(filter(None, 
items))
-            userpass = joinif("/", [config.get("ha-username"), 
config.get("ha-password")])
-            hostport = joinif(":", ["localhost", config.get("port")])
-            opts.broker = joinif("@", [userpass, hostport])
-            opts.sasl_mechanism = config.get("ha-mechanism")
-
+            if not url.user: url.user = config.get("ha-username")
+            if not url.password: url.password = config.get("ha-password")
+            if not url.port: url.port = config.get("port")
+            opts.broker = str(url)
+            if not opts.sasl_mechanism: opts.sasl_mechanism = 
config.get("ha-mechanism")
+            if  not opts.timeout:
+                timeout = config.get("ha-heartbeat-interval") or 
config.get("link-heartbeat-interval")
+                if timeout: opts.timeout = float(timeout)
+        else:                   # Use DEFAULTS
+            if not opts.timeout: opts.timeout = DEFAULTS["timeout"]
         if opts.sasl_mechanism: conn_options['sasl_mechanisms'] = 
opts.sasl_mechanism
         if opts.ssl_certificate: conn_options['ssl_certfile'] = 
opts.ssl_certificate
         if opts.ssl_key:
@@ -72,9 +87,12 @@ class Command:
                 self.op.error("missing '--ssl-certificate' (required by 
'--ssl-key')")
             conn_options['ssl_keyfile'] = opts.ssl_key
         conn_options['client_properties'] = {'qpid.ha-admin' : 1}
+        if opts.timeout:
+            conn_options['timeout'] = opts.timeout
+            conn_options['heartbeat'] = int(opts.timeout)
         connection = Connection.establish(opts.broker, **conn_options)
-        qmf_broker = BrokerAgent(connection)
-        ha_broker = qmf_broker.getHaBroker()
+        qmf_broker = self.connect_agent and BrokerAgent(connection)
+        ha_broker = self.connect_agent and qmf_broker.getHaBroker()
         return (connection, qmf_broker, ha_broker)
 
     def execute(self, args):
@@ -82,14 +100,22 @@ class Command:
         if len(args) != len(self.arg_names)+1:
             self.op.print_help()
             raise Exception("Wrong number of arguments")
-        connection, qmf_broker, ha_broker = self.connect(opts)
-        if not ha_broker: raise Exception("HA module is not loaded on broker 
at %s" % opts.broker)
+        self.connection, qmf_broker, ha_broker = self.connect(opts)
+        if self.connect_agent and not ha_broker:
+            raise Exception("HA module is not loaded on broker at %s" % 
opts.broker)
         try: self.do_execute(qmf_broker, ha_broker, opts, args)
-        finally: connection.close()
+        finally: self.connection.close()
 
     def do_execute(self, qmf_broker, opts, args):
         raise Exception("Command '%s' is not yet implemented"%self.name)
 
+class PingCmd(Command):
+    def __init__(self):
+        Command.__init__(self, "ping","Check if the broker is alive and 
responding", connect_agent=False)
+    def do_execute(self, qmf_broker, ha_broker, opts, args):
+        self.connection.session() # Make sure we can establish a session.
+PingCmd()
+
 class PromoteCmd(Command):
     def __init__(self):
         Command.__init__(self, "promote","Promote broker from backup to 
primary")
@@ -101,19 +127,20 @@ class StatusCmd(Command):
     def __init__(self):
         Command.__init__(self, "status", "Print HA status")
         self.op.add_option(
-            "--expect", type="string", metavar="<status>",
+            "--expect", metavar="<status>",
             help="Don't print status. Return 0 if it matches <status>, 1 
otherwise")
         self.op.add_option(
             "--is-primary", action="store_true", default=False,
             help="Don't print status. Return 0 if the broker is primary, 1 
otherwise")
         self.op.add_option(
             "--all", action="store_true", default=False,
-            help="Print status for all brokers in the cluster.")
+            help="Print status for all brokers in the cluster")
     def do_execute(self, qmf_broker, ha_broker, opts, args):
         if opts.is_primary:
             if not ha_broker.status in ["active", "recovering"]: raise 
ExitStatus(1)
         if opts.expect:
             if opts.expect != ha_broker.status: raise ExitStatus(1)
+        # The brokersUrl setting is not in python UR format, simpler parsing 
here.
         brokers = filter(None, re.sub(r'(^amqps?:)|(tcp:)', "", 
ha_broker.brokersUrl).split(","))
         if opts.all and brokers:
             opts.all=False
@@ -129,7 +156,6 @@ class StatusCmd(Command):
                     print b, e
         else:
             print ha_broker.status
-
 StatusCmd()
 
 class ReplicateCmd(Command):
@@ -200,7 +226,7 @@ def main(argv):
     except ExitStatus, e:
         return e.status
     except Exception, e:
-        print e
+        print "%s: %s"%(type(e).__name__, e)
         return 1
 
 if __name__ == "__main__":



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to