IMPORTANT: You MUST "svn copy barrier.py base_barrier.py" and
"svn copy barrier_unittest.py base_barrier_unittest.py"
in the client/common_lib/ directory before applying this patch.
barrier cleanups:
* renames barrier.py to base_barrier.py and adds a barrier.py
stub to import from base_barrier and override with site_barrier
if found. barrier_unittest.py is renamed to match.
* Move BarrierAbortError to the error module with everything else.
* Add a rendezvous_servers abort=True from the server test case.
* Moved get_sync_control_file() from common_lib.utils to
server.base_utils where it belongs to avoid a circular
import of utils importing barrier.
Signed-off-by: Gregory Smith <[email protected]>
--- autotest/client/common_lib/barrier.py 2010-04-13 16:00:38.000000000
-0700
+++ autotest/client/common_lib/barrier.py 2010-04-13 18:15:53.000000000
-0700
@@ -1,543 +1,8 @@
-import sys, socket, errno, logging
-from time import time, sleep
-from autotest_lib.client.common_lib import error
+from autotest_lib.client.common_lib.base_barrier import listen_server, barrier
+from autotest_lib.client.common_lib import utils
-# default barrier port
-_DEFAULT_PORT = 11922
-
-
-class BarrierAbortError(error.BarrierError):
- """Special BarrierError raised when an explicit abort is requested."""
-
-
-class listen_server(object):
- """
- Manages a listening socket for barrier.
-
- Can be used to run multiple barrier instances with the same listening
- socket (if they were going to listen on the same port).
-
- Attributes:
-
- @attr address: Address to bind to (string).
- @attr port: Port to bind to.
- @attr socket: Listening socket object.
- """
- def __init__(self, address='', port=_DEFAULT_PORT):
- """
- Create a listen_server instance for the given address/port.
-
- @param address: The address to listen on.
- @param port: The port to listen on.
- """
- self.address = address
- self.port = port
- self.socket = self._setup()
-
-
- def _setup(self):
- """Create, bind and listen on the listening socket."""
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind((self.address, self.port))
- sock.listen(10)
-
- return sock
-
-
- def close(self):
- """Close the listening socket."""
- self.socket.close()
-
-
-class barrier(object):
- """Multi-machine barrier support.
-
- Provides multi-machine barrier mechanism.
- Execution stops until all members arrive at the barrier.
-
- Implementation Details:
- .......................
-
- When a barrier is forming the master node (first in sort order) in the
- set accepts connections from each member of the set. As they arrive
- they indicate the barrier they are joining and their identifier (their
- hostname or IP address and optional tag). They are then asked to wait.
- When all members are present the master node then checks that each
- member is still responding via a ping/pong exchange. If this is
- successful then everyone has checked in at the barrier. We then tell
- everyone they may continue via a rlse message.
-
- Where the master is not the first to reach the barrier the client
- connects will fail. Client will retry until they either succeed in
- connecting to master or the overall timeout is exceeded.
-
- As an example here is the exchange for a three node barrier called
- 'TAG'
-
- MASTER CLIENT1 CLIENT2
- <-------------TAG C1-------------
- --------------wait-------------->
- [...]
- <-------------TAG C2-----------------------------
- --------------wait------------------------------>
- [...]
- --------------ping-------------->
- <-------------pong---------------
- --------------ping------------------------------>
- <-------------pong-------------------------------
- ----- BARRIER conditions MET -----
- --------------rlse-------------->
- --------------rlse------------------------------>
-
- Note that once the last client has responded to pong the barrier is
- implicitly deemed satisifed, they have all acknowledged their presence.
- If we fail to send any of the rlse messages the barrier is still a
- success, the failed host has effectively broken 'right at the beginning'
- of the post barrier execution window.
-
- In addition, there is another rendezvous, that makes each slave a server
- and the master a client. The connection process and usage is still the
- same but allows barriers from machines that only have a one-way
- connection initiation. This is called rendezvous_servers.
-
- For example:
- if ME == SERVER:
- server start
-
- b = job.barrier(ME, 'server-up', 120)
- b.rendezvous(CLIENT, SERVER)
-
- if ME == CLIENT:
- client run
-
- b = job.barrier(ME, 'test-complete', 3600)
- b.rendezvous(CLIENT, SERVER)
-
- if ME == SERVER:
- server stop
-
- Any client can also request an abort of the job by setting
- abort=True in the rendezvous arguments.
- """
-
- def __init__(self, hostid, tag, timeout=None, port=None,
- listen_server=None):
- """
- @param hostid: My hostname/IP address + optional tag.
- @param tag: Symbolic name of the barrier in progress.
- @param timeout: Maximum seconds to wait for a the barrier to meet.
- @param port: Port number to listen on.
- @param listen_server: External listen_server instance to use instead
- of creating our own. Create a listen_server instance and
- reuse it across multiple barrier instances so that the
- barrier code doesn't try to quickly re-bind on the same port
- (packets still in transit for the previous barrier they may
- reset new connections).
- """
- self._hostid = hostid
- self._tag = tag
- if listen_server:
- if port:
- raise error.BarrierError(
- '"port" and "listen_server" are mutually exclusive.')
- self._port = listen_server.port
- else:
- self._port = port or _DEFAULT_PORT
- self._server = listen_server # A listen_server instance or None.
- self._members = [] # List of hosts we expect to find at the barrier.
- self._timeout_secs = timeout
- self._start_time = None # Timestamp of when we started waiting.
- self._masterid = None # Host/IP + optional tag of selected master.
- logging.info("tag=%s port=%d timeout=%r",
- self._tag, self._port, self._timeout_secs)
-
- # Number of clients seen (should be the length of self._waiting).
- self._seen = 0
-
- # Clients who have checked in and are waiting (if we are a master).
- self._waiting = {} # Maps from hostname -> (client, addr) tuples.
-
-
- def _get_host_from_id(self, hostid):
- # Remove any trailing local identifier following a #.
- # This allows multiple members per host which is particularly
- # helpful in testing.
- if not hostid.startswith('#'):
- return hostid.split('#')[0]
- else:
- raise error.BarrierError(
- "Invalid Host id: Host Address should be specified")
-
-
- def _update_timeout(self, timeout):
- if timeout is not None and self._start_time is not None:
- self._timeout_secs = (time() - self._start_time) + timeout
- else:
- self._timeout_secs = timeout
-
-
- def _remaining(self):
- if self._timeout_secs is not None and self._start_time is not None:
- timeout = self._timeout_secs - (time() - self._start_time)
- if timeout <= 0:
- errmsg = "timeout waiting for barrier: %s" % self._tag
- logging.error(error)
- raise error.BarrierError(errmsg)
- else:
- timeout = self._timeout_secs
-
- if self._timeout_secs is not None:
- logging.info("seconds remaining: %d", timeout)
- return timeout
-
-
- def _master_welcome(self, connection):
- client, addr = connection
- name = None
-
- client.settimeout(5)
- try:
- # Get the clients name.
- intro = client.recv(1024)
- intro = intro.strip("\r\n")
-
- intro_parts = intro.split(' ', 2)
- if len(intro_parts) != 2:
- logging.warn("Ignoring invalid data from %s: %r",
- client.getpeername(), intro)
- client.close()
- return
- tag, name = intro_parts
-
- logging.info("new client tag=%s, name=%s", tag, name)
-
- # Ok, we know who is trying to attach. Confirm that
- # they are coming to the same meeting. Also, everyone
- # should be using a unique handle (their IP address).
- # If we see a duplicate, something _bad_ has happened
- # so drop them now.
- if self._tag != tag:
- logging.warn("client arriving for the wrong barrier: %s != %s",
- self._tag, tag)
- client.settimeout(5)
- client.send("!tag")
- client.close()
- return
- elif name in self._waiting:
- logging.warn("duplicate client")
- client.settimeout(5)
- client.send("!dup")
- client.close()
- return
-
- # Acknowledge the client
- client.send("wait")
-
- except socket.timeout:
- # This is nominally an error, but as we do not know
- # who that was we cannot do anything sane other
- # than report it and let the normal timeout kill
- # us when thats appropriate.
- logging.warn("client handshake timeout: (%s:%d)",
- addr[0], addr[1])
- client.close()
- return
-
- logging.info("client now waiting: %s (%s:%d)",
- name, addr[0], addr[1])
-
- # They seem to be valid record them.
- self._waiting[name] = connection
- self._seen += 1
-
-
- def _slave_hello(self, connection):
- (client, addr) = connection
- name = None
-
- client.settimeout(5)
- try:
- client.send(self._tag + " " + self._hostid)
-
- reply = client.recv(4)
- reply = reply.strip("\r\n")
- logging.info("master said: %s", reply)
-
- # Confirm the master accepted the connection.
- if reply != "wait":
- logging.warn("Bad connection request to master")
- client.close()
- return
-
- except socket.timeout:
- # This is nominally an error, but as we do not know
- # who that was we cannot do anything sane other
- # than report it and let the normal timeout kill
- # us when thats appropriate.
- logging.error("master handshake timeout: (%s:%d)",
- addr[0], addr[1])
- client.close()
- return
-
- logging.info("slave now waiting: (%s:%d)", addr[0], addr[1])
-
- # They seem to be valid record them.
- self._waiting[self._hostid] = connection
- self._seen = 1
-
-
- def _master_release(self):
- # Check everyone is still there, that they have not
- # crashed or disconnected in the meantime.
- allpresent = True
- abort = self._abort
- for name in self._waiting:
- (client, addr) = self._waiting[name]
-
- logging.info("checking client present: %s", name)
-
- client.settimeout(5)
- reply = 'none'
- try:
- client.send("ping")
- reply = client.recv(1024)
- except socket.timeout:
- logging.warn("ping/pong timeout: %s", name)
- pass
-
- if reply == 'abrt':
- logging.warn("Client %s requested abort", name)
- abort = True
- elif reply != "pong":
- allpresent = False
-
- if not allpresent:
- raise error.BarrierError("master lost client")
-
- if abort:
- logging.info("Aborting the clients")
- msg = 'abrt'
- else:
- logging.info("Releasing clients")
- msg = 'rlse'
-
- # If every ones checks in then commit the release.
- for name in self._waiting:
- (client, addr) = self._waiting[name]
-
- client.settimeout(5)
- try:
- client.send(msg)
- except socket.timeout:
- logging.warn("release timeout: %s", name)
- pass
-
- if abort:
- raise BarrierAbortError("Client requested abort")
-
-
- def _waiting_close(self):
- # Either way, close out all the clients. If we have
- # not released them then they know to abort.
- for name in self._waiting:
- (client, addr) = self._waiting[name]
-
- logging.info("closing client: %s", name)
-
- try:
- client.close()
- except:
- pass
-
-
- def _run_server(self, is_master):
- server = self._server or listen_server(port=self._port)
- failed = 0
- try:
- while True:
- try:
- # Wait for callers welcoming each.
- server.socket.settimeout(self._remaining())
- connection = server.socket.accept()
- if is_master:
- self._master_welcome(connection)
- else:
- self._slave_hello(connection)
- except socket.timeout:
- logging.warn("timeout waiting for remaining clients")
- pass
-
- if is_master:
- # Check if everyone is here.
- logging.info("master seen %d of %d",
- self._seen, len(self._members))
- if self._seen == len(self._members):
- self._master_release()
- break
- else:
- # Check if master connected.
- if self._seen:
- logging.info("slave connected to master")
- self._slave_wait()
- break
- finally:
- self._waiting_close()
- # if we created the listening_server in the beginning of this
- # function then close the listening socket here
- if not self._server:
- server.close()
-
-
- def _run_client(self, is_master):
- while self._remaining() is None or self._remaining() > 0:
- try:
- remote = socket.socket(socket.AF_INET,
- socket.SOCK_STREAM)
- remote.settimeout(30)
- if is_master:
- # Connect to all slaves.
- host = self._get_host_from_id(self._members[self._seen])
- logging.info("calling slave: %s", host)
- connection = (remote, (host, self._port))
- remote.connect(connection[1])
- self._master_welcome(connection)
- else:
- # Just connect to the master.
- host = self._get_host_from_id(self._masterid)
- logging.info("calling master")
- connection = (remote, (host, self._port))
- remote.connect(connection[1])
- self._slave_hello(connection)
- except socket.timeout:
- logging.warn("timeout calling host, retry")
- sleep(10)
- pass
- except socket.error, err:
- (code, str) = err
- if (code != errno.ECONNREFUSED):
- raise
- sleep(10)
-
- if is_master:
- # Check if everyone is here.
- logging.info("master seen %d of %d",
- self._seen, len(self._members))
- if self._seen == len(self._members):
- self._master_release()
- break
- else:
- # Check if master connected.
- if self._seen:
- logging.info("slave connected to master")
- self._slave_wait()
- break
-
- self._waiting_close()
-
-
- def _slave_wait(self):
- remote = self._waiting[self._hostid][0]
- mode = "wait"
- while True:
- # All control messages are the same size to allow
- # us to split individual messages easily.
- remote.settimeout(self._remaining())
- reply = remote.recv(4)
- if not reply:
- break
-
- reply = reply.strip("\r\n")
- logging.info("master said: %s", reply)
-
- mode = reply
- if reply == "ping":
- # Ensure we have sufficient time for the
- # ping/pong/rlse cyle to complete normally.
- self._update_timeout(10 + 10 * len(self._members))
-
- if self._abort:
- msg = "abrt"
- else:
- msg = "pong"
- logging.info(msg)
- remote.settimeout(self._remaining())
- remote.send(msg)
-
- elif reply == "rlse" or reply == "abrt":
- # Ensure we have sufficient time for the
- # ping/pong/rlse cyle to complete normally.
- self._update_timeout(10 + 10 * len(self._members))
-
- logging.info("was released, waiting for close")
-
- if mode == "rlse":
- pass
- elif mode == "wait":
- raise error.BarrierError("master abort -- barrier timeout")
- elif mode == "ping":
- raise error.BarrierError("master abort -- client lost")
- elif mode == "!tag":
- raise error.BarrierError("master abort -- incorrect tag")
- elif mode == "!dup":
- raise error.BarrierError("master abort -- duplicate client")
- elif mode == "abrt":
- raise BarrierAbortError("Client requested abort")
- else:
- raise error.BarrierError("master handshake failure: " + mode)
-
-
- def rendezvous(self, *hosts, **dargs):
- # if called with abort=True, this will raise an exception
- # on all the clients.
- self._start_time = time()
- self._members = list(hosts)
- self._members.sort()
- self._masterid = self._members.pop(0)
- self._abort = dargs.get('abort', False)
-
- logging.info("masterid: %s", self._masterid)
- if self._abort:
- logging.debug("%s is aborting", self._hostid)
- if not len(self._members):
- logging.info("No other members listed.")
- return
- logging.info("members: %s", ",".join(self._members))
-
- self._seen = 0
- self._waiting = {}
-
- # Figure out who is the master in this barrier.
- if self._hostid == self._masterid:
- logging.info("selected as master")
- self._run_server(is_master=True)
- else:
- logging.info("selected as slave")
- self._run_client(is_master=False)
-
-
- def rendezvous_servers(self, masterid, *hosts, **dargs):
- # if called with abort=True, this will raise an exception
- # on all the clients.
- self._start_time = time()
- self._members = list(hosts)
- self._members.sort()
- self._masterid = masterid
- self._abort = dargs.get('abort', False)
-
- logging.info("masterid: %s", self._masterid)
- if not len(self._members):
- logging.info("No other members listed.")
- return
- logging.info("members: %s", ",".join(self._members))
-
- self._seen = 0
- self._waiting = {}
-
- # Figure out who is the master in this barrier.
- if self._hostid == self._masterid:
- logging.info("selected as master")
- self._run_client(is_master=True)
- else:
- logging.info("selected as slave")
- self._run_server(is_master=False)
+_SITE_MODULE_NAME = 'autotest_lib.client.common_lib.site_barrier'
+listen_server = utils.import_site_symbol(
+ __file__, _SITE_MODULE_NAME, 'listen_server', listen_server)
+barrier = utils.import_site_symbol(
+ __file__, _SITE_MODULE_NAME, 'barrier', barrier)
==== (deleted)
//depot/google_vendor_src_branch/autotest/client/common_lib/barrier_unittest.py
====
--- autotest/client/common_lib/barrier_unittest.py 2010-04-13
16:00:38.000000000 -0700
+++ /dev/null 2009-12-17 12:29:38.000000000 -0800
@@ -1,202 +0,0 @@
-#!/usr/bin/python2.4
-
-__author__ = """Ashwin Ganti ([email protected])"""
-
-import os, sys, socket, errno, unittest, threading
-from time import time, sleep
-import common
-from autotest_lib.client.common_lib import error, barrier
-from autotest_lib.client.common_lib.test_utils import mock
-
-
-class listen_server_test(unittest.TestCase):
-
- def test_init(self):
- server = barrier.listen_server()
- server.close()
-
-
- def test_close(self):
- server = barrier.listen_server()
- # cannot bind on the same port again
- self.assertRaises(socket.error, barrier.listen_server)
- server.close()
- # now we can
- server = barrier.listen_server()
- server.close()
-
-
-class barrier_test(unittest.TestCase):
-
- def setUp(self):
- self.god = mock.mock_god()
- self.god.mock_io()
-
-
- def tearDown(self):
- self.god.unmock_io()
-
-
- def test_initialize(self):
- b = barrier.barrier('127.0.0.1#', 'testtag', 100, 11921)
- self.assertEqual(b._hostid, '127.0.0.1#')
- self.assertEqual(b._tag, 'testtag')
- self.assertEqual(b._timeout_secs, 100)
- self.assertEqual(b._port, 11921)
-
-
- def test_get_host_from_id(self):
- b = barrier.barrier('127.0.0.1#', 'testgethost', 100)
-
- hostname = b._get_host_from_id('my_host')
- self.assertEqual(hostname, 'my_host')
-
- hostname = b._get_host_from_id('my_host#')
- self.assertEqual(hostname, 'my_host')
-
- self.assertRaises(error.BarrierError, b._get_host_from_id, '#my_host')
-
-
- def test_update_timeout(self):
- b = barrier.barrier('127.0.0.1#', 'update', 100)
- b._update_timeout(120)
- self.assertEqual(b._timeout_secs, 120)
-
-
- def test_remaining(self):
- b = barrier.barrier('127.0.0.1#', 'remain', 100)
- remain = b._remaining()
- self.assertEqual(remain, 100)
-
-
- def test_master_welcome_garbage(self):
- b = barrier.barrier('127.0.0.1#', 'garbage', 100)
- waiting_before = dict(b._waiting)
- seen_before = b._seen
-
- sender, receiver = socket.socketpair()
- try:
- sender.send('GET /foobar?p=-1 HTTP/1.0\r\n\r\n')
- # This should not raise an exception.
- b._master_welcome((receiver, 'fakeaddr'))
-
- self.assertEqual(waiting_before, b._waiting)
- self.assertEqual(seen_before, b._seen)
-
- sender, receiver = socket.socketpair()
- sender.send('abcdefg\x00\x01\x02\n'*5)
- # This should not raise an exception.
- b._master_welcome((receiver, 'fakeaddr'))
-
- self.assertEqual(waiting_before, b._waiting)
- self.assertEqual(seen_before, b._seen)
- finally:
- sender.close()
- receiver.close()
-
-
- def test_rendezvous_basic(self):
- # Basic rendezvous testing
- self.rendezvous_test(60, port=11920)
-
-
- def test_rendezvous_timeout(self):
- # The rendezvous should time out here and throw a
- # BarrierError since we are specifying a timeout of 0
- self.assertRaises(error.BarrierError,
- self.rendezvous_test, 0, port=11921)
-
-
- def test_rendezvous_abort_ok(self):
- # Test with abort flag set to not abort.
- self.rendezvous_test(60, port=11920,
- test_abort=True, abort=False)
-
-
- def test_rendezvous_abort(self):
- # The rendezvous should abort here and throw a
- # BarrierError since we are asking to abort
- self.assertRaises(error.BarrierError,
- self.rendezvous_test, 0, port=11921,
- test_abort=True, abort=True)
-
-
- def test_rendezvous_servers_basic(self):
- # The rendezvous should time out here and throw a
- # BarrierError since we are specifying a timeout of 0
- self.rendezvous_test(60, port=11921,
- rendezvous_servers=True)
-
-
- def test_rendezvous_servers_timeout(self):
- # The rendezvous should time out here and throw a
- # BarrierError since we are specifying a timeout of 0
- self.assertRaises(error.BarrierError,
- self.rendezvous_test, 0, port=11922,
- rendezvous_servers=True)
-
-
- def test_rendezvous_servers_abort_ok(self):
- # Test with abort flag set to not abort.
- self.rendezvous_test(60, port=11920, rendezvous_servers=True,
- test_abort=True, abort=False)
-
-
- def test_rendezvous_servers_abort(self):
- # The rendezvous should abort here and throw a
- # BarrierError since we are asking to abort
- self.assertRaises(error.BarrierError,
- self.rendezvous_test, 0, port=11922,
- rendezvous_servers=True,
- test_abort=True, abort=True)
-
-
- # Internal utility function (not a unit test)
- def rendezvous_test(self, timeout, port=11922,
- rendezvous_servers=False, test_abort=False,
- abort=False, listen_server=None):
- if listen_server:
- port = None
-
- def _rdv(addr):
- b1 = barrier.barrier(addr, "test_meeting", timeout, port,
- listen_server=listen_server)
- if not rendezvous_servers:
- if test_abort:
- b1.rendezvous('127.0.0.1#0', '127.0.0.1#1', abort=abort)
- else:
- b1.rendezvous('127.0.0.1#0', '127.0.0.1#1')
- else:
- if test_abort:
- b1.rendezvous_servers('127.0.0.1#0', '127.0.0.1#1',
- abort=abort)
- else:
- b1.rendezvous_servers('127.0.0.1#0', '127.0.0.1#1')
-
-
- def _thread_rdv(addr):
- # We need to ignore the exception on one side.
- try:
- _rdv(addr)
- except error.BarrierError:
- pass
-
- client = threading.Thread(target=_thread_rdv,
- args=('127.0.0.1#0',))
- client.start()
- _rdv('127.0.0.1#1')
- client.join()
-
-
- def test_reusing_listen_server(self):
- """
- Test that reusing the same listen server object works.
- """
- server = barrier.listen_server()
- self.rendezvous_test(10, listen_server=server)
- self.rendezvous_test(10, listen_server=server)
- self.rendezvous_test(10, listen_server=server)
-
-
-if __name__ == "__main__":
- unittest.main()
--- autotest/client/common_lib/barrier.py 2010-04-13 16:00:38.000000000
-0700
+++ autotest/client/common_lib/base_barrier.py 2010-04-13 18:15:53.000000000
-0700
@@ -2,12 +2,8 @@
from time import time, sleep
from autotest_lib.client.common_lib import error
-# default barrier port
-_DEFAULT_PORT = 11922
-
-
-class BarrierAbortError(error.BarrierError):
- """Special BarrierError raised when an explicit abort is requested."""
+# Default barrier TCP port.
+DEFAULT_PORT = 11922
class listen_server(object):
@@ -23,7 +19,7 @@
@attr port: Port to bind to.
@attr socket: Listening socket object.
"""
- def __init__(self, address='', port=_DEFAULT_PORT):
+ def __init__(self, address='', port=DEFAULT_PORT):
"""
Create a listen_server instance for the given address/port.
@@ -143,7 +139,7 @@
'"port" and "listen_server" are mutually exclusive.')
self._port = listen_server.port
else:
- self._port = port or _DEFAULT_PORT
+ self._port = port or DEFAULT_PORT
self._server = listen_server # A listen_server instance or None.
self._members = [] # List of hosts we expect to find at the barrier.
self._timeout_secs = timeout
@@ -334,7 +330,7 @@
pass
if abort:
- raise BarrierAbortError("Client requested abort")
+ raise error.BarrierAbortError("Client requested abort")
def _waiting_close(self):
@@ -482,7 +478,7 @@
elif mode == "!dup":
raise error.BarrierError("master abort -- duplicate client")
elif mode == "abrt":
- raise BarrierAbortError("Client requested abort")
+ raise error.BarrierAbortError("Client requested abort")
else:
raise error.BarrierError("master handshake failure: " + mode)
--- autotest/client/common_lib/barrier_unittest.py 2010-04-13
16:00:38.000000000 -0700
+++ autotest/client/common_lib/base_barrier_unittest.py 2010-04-13
18:15:53.000000000 -0700
@@ -5,7 +5,8 @@
import os, sys, socket, errno, unittest, threading
from time import time, sleep
import common
-from autotest_lib.client.common_lib import error, barrier
+from autotest_lib.client.common_lib import error, base_barrier
+barrier = base_barrier
from autotest_lib.client.common_lib.test_utils import mock
--- autotest/client/common_lib/error.py 2010-04-13 18:15:53.000000000 -0700
+++ autotest/client/common_lib/error.py 2010-04-13 18:15:53.000000000 -0700
@@ -143,6 +143,11 @@
pass
+class BarrierAbortError(BarrierError):
+ """Indicate that the barrier was explicitly aborted by a member."""
+ pass
+
+
class InstallError(JobError):
"""Indicates an installation error which Terminates and fails the job."""
pass
--- autotest/client/common_lib/utils.py 2010-04-12 09:44:38.000000000 -0700
+++ autotest/client/common_lib/utils.py 2010-04-13 18:15:53.000000000 -0700
@@ -8,7 +8,7 @@
import hashlib
except ImportError:
import md5, sha
-from autotest_lib.client.common_lib import error, barrier, logging_manager
+from autotest_lib.client.common_lib import error, logging_manager
def deprecated(func):
"""This is a decorator which can be used to mark functions as deprecated.
@@ -760,112 +760,6 @@
return cpu_percent, to_return
-"""
-This function is used when there is a need to run more than one
-job simultaneously starting exactly at the same time. It basically returns
-a modified control file (containing the synchronization code prepended)
-whenever it is ready to run the control file. The synchronization
-is done using barriers to make sure that the jobs start at the same time.
-
-Here is how the synchronization is done to make sure that the tests
-start at exactly the same time on the client.
-sc_bar is a server barrier and s_bar, c_bar are the normal barriers
-
- Job1 Job2 ...... JobN
- Server: | sc_bar
- Server: | s_bar ...... s_bar
- Server: | at.run() at.run() ...... at.run()
- ----------|------------------------------------------------------
- Client | sc_bar
- Client | c_bar c_bar ...... c_bar
- Client | <run test> <run test> ...... <run test>
-
-
-PARAMS:
- control_file : The control file which to which the above synchronization
- code would be prepended to
- host_name : The host name on which the job is going to run
- host_num (non negative) : A number to identify the machine so that we have
- different sets of s_bar_ports for each of the machines.
- instance : The number of the job
- num_jobs : Total number of jobs that are going to run in parallel with
- this job starting at the same time
- port_base : Port number that is used to derive the actual barrier ports.
-
-RETURN VALUE:
- The modified control file.
-
-"""
-def get_sync_control_file(control, host_name, host_num,
- instance, num_jobs, port_base=63100):
- sc_bar_port = port_base
- c_bar_port = port_base
- if host_num < 0:
- print "Please provide a non negative number for the host"
- return None
- s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
- # the same for a given machine
-
- sc_bar_timeout = 180
- s_bar_timeout = c_bar_timeout = 120
-
- # The barrier code snippet is prepended into the conrol file
- # dynamically before at.run() is called finally.
- control_new = []
-
- # jobid is the unique name used to identify the processes
- # trying to reach the barriers
- jobid = "%s#%d" % (host_name, instance)
-
- rendv = []
- # rendvstr is a temp holder for the rendezvous list of the processes
- for n in range(num_jobs):
- rendv.append("'%s#%d'" % (host_name, n))
- rendvstr = ",".join(rendv)
-
- if instance == 0:
- # Do the setup and wait at the server barrier
- # Clean up the tmp and the control dirs for the first instance
- control_new.append('if os.path.exists(job.tmpdir):')
- control_new.append("\t system('umount -f %s > /dev/null"
- "2> /dev/null' % job.tmpdir,"
- "ignore_status=True)")
- control_new.append("\t system('rm -rf ' + job.tmpdir)")
- control_new.append(
- 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
- % (jobid, sc_bar_timeout, sc_bar_port))
- control_new.append(
- 'b0.rendezvous_servers("PARALLEL_MASTER", "%s")'
- % jobid)
-
- elif instance == 1:
- # Wait at the server barrier to wait for instance=0
- # process to complete setup
- b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
- port=sc_bar_port)
- b0.rendezvous_servers("PARALLEL_MASTER", jobid)
-
- if(num_jobs > 2):
- b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
- port=s_bar_port)
- b1.rendezvous(rendvstr)
-
- else:
- # For the rest of the clients
- b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
- b2.rendezvous(rendvstr)
-
- # Client side barrier for all the tests to start at the same time
- control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
- % (jobid, c_bar_timeout, c_bar_port))
- control_new.append("b1.rendezvous(%s)" % rendvstr)
-
- # Stick in the rest of the control file
- control_new.append(control)
-
- return "\n".join(control_new)
-
-
def get_arch(run_function=run):
"""
Get the hardware architecture of the machine.
--- autotest/client/tests/barriertest/barriertest.py 2010-04-13
16:00:38.000000000 -0700
+++ autotest/client/tests/barriertest/barriertest.py 2010-04-13
18:15:53.000000000 -0700
@@ -2,45 +2,69 @@
import logging, time
from autotest_lib.client.bin import test
-from autotest_lib.client.common_lib import barrier
+from autotest_lib.client.common_lib import barrier, error
class barriertest(test.test):
version = 2
- def run_once(self, our_addr, hostnames, master):
+ def run_once(self, our_addr, hostnames, master, timeout=120):
# A reusable local server as we're using multiple barriers in one test.
server = barrier.listen_server()
# Basic barrier rendezvous test.
- self.job.barrier(our_addr, 'First', timeout=60, listen_server=server
- ).rendezvous(*hostnames)
+ self.job.barrier(our_addr, 'First', timeout=timeout,
+ listen_server=server).rendezvous(*hostnames)
logging.info('1. rendezvous "First" complete.')
time.sleep(2)
# A rendezvous_servers using a different master than the default.
- self.job.barrier(our_addr, 'Second', timeout=60, listen_server=server
+ self.job.barrier(our_addr, 'Second', timeout=timeout,
+ listen_server=server
).rendezvous_servers(hostnames[-1], *hostnames[:-1])
logging.info('2. rendezvous_servers "Second" complete.')
time.sleep(2)
# A regular rendezvous, this time testing the abort functionality.
try:
- self.job.barrier(our_addr, 'Third', timeout=60,
+ self.job.barrier(our_addr, 'WillAbort', timeout=timeout,
listen_server=server
).rendezvous(abort=True, *hostnames)
- except barrier.BarrierAbortError:
+ except error.BarrierAbortError:
pass
+ except error.BarrierError, e:
+ # We did get an error from the barrier, but was is acceptable or
+ # not? Site code may not be able to indicate an explicit abort.
+ self.job.record('WARN', None, 'barriertest',
+ 'BarrierError %s instead of BarrierAbortError.' %
e)
else:
raise error.TestFail('Explicit barrier rendezvous abort failed.')
- logging.info('3. rendezvous(abort=True) "Third" complete.')
+ logging.info('3. rendezvous(abort=True) complete.')
time.sleep(2)
# Now attempt a rendezvous_servers that also includes the server.
- self.job.barrier(our_addr, 'Final', timeout=60, listen_server=server
+ self.job.barrier(our_addr, 'FinalSync', timeout=timeout,
+ listen_server=server
).rendezvous_servers(master, *hostnames)
- logging.info('N. rendezvous_servers "Final" complete.')
+ logging.info('4. rendezvous_servers "FinalSync" complete.')
+ time.sleep(2)
+
+ # rendezvous_servers, aborted from the master.
+ try:
+ self.job.barrier(our_addr, 'WillAbortServers', timeout=timeout,
+ listen_server=server
+ ).rendezvous_servers(master, *hostnames)
+ except error.BarrierAbortError:
+ pass
+ except error.BarrierError, e:
+ # We did get an error from the barrier, but was is acceptable or
+ # not? Site code may not be able to indicate an explicit abort.
+ self.job.record('WARN', None, 'barriertest',
+ 'BarrierError %s instead of BarrierAbortError.' %
e)
+ else:
+ raise error.TestFail('Explicit barrier rendezvous abort failed.')
+ logging.info('5. rendezvous_servers(abort=True) complete.')
time.sleep(2)
server.close()
--- autotest/server/base_utils.py 2010-04-13 18:15:53.000000000 -0700
+++ autotest/server/base_utils.py 2010-04-13 18:15:53.000000000 -0700
@@ -10,7 +10,7 @@
import atexit, os, re, shutil, textwrap, sys, tempfile, types
-from autotest_lib.client.common_lib import utils
+from autotest_lib.client.common_lib import barrier, utils
from autotest_lib.server import subcommand
@@ -323,3 +323,106 @@
public_key.close()
return public_key_str
+
+
+def get_sync_control_file(control, host_name, host_num,
+ instance, num_jobs, port_base=63100):
+ """
+ This function is used when there is a need to run more than one
+ job simultaneously starting exactly at the same time. It basically returns
+ a modified control file (containing the synchronization code prepended)
+ whenever it is ready to run the control file. The synchronization
+ is done using barriers to make sure that the jobs start at the same time.
+
+ Here is how the synchronization is done to make sure that the tests
+ start at exactly the same time on the client.
+ sc_bar is a server barrier and s_bar, c_bar are the normal barriers
+
+ Job1 Job2 ...... JobN
+ Server: | sc_bar
+ Server: | s_bar ...... s_bar
+ Server: | at.run() at.run() ...... at.run()
+ ----------|------------------------------------------------------
+ Client | sc_bar
+ Client | c_bar c_bar ...... c_bar
+ Client | <run test> <run test> ...... <run test>
+
+ @param control: The control file which to which the above synchronization
+ code will be prepended.
+ @param host_name: The host name on which the job is going to run.
+ @param host_num: (non negative) A number to identify the machine so that
+ we have different sets of s_bar_ports for each of the machines.
+ @param instance: The number of the job
+ @param num_jobs: Total number of jobs that are going to run in parallel
+ with this job starting at the same time.
+ @param port_base: Port number that is used to derive the actual barrier
+ ports.
+
+ @returns The modified control file.
+ """
+ sc_bar_port = port_base
+ c_bar_port = port_base
+ if host_num < 0:
+ print "Please provide a non negative number for the host"
+ return None
+ s_bar_port = port_base + 1 + host_num # The set of s_bar_ports are
+ # the same for a given machine
+
+ sc_bar_timeout = 180
+ s_bar_timeout = c_bar_timeout = 120
+
+ # The barrier code snippet is prepended into the conrol file
+ # dynamically before at.run() is called finally.
+ control_new = []
+
+ # jobid is the unique name used to identify the processes
+ # trying to reach the barriers
+ jobid = "%s#%d" % (host_name, instance)
+
+ rendv = []
+ # rendvstr is a temp holder for the rendezvous list of the processes
+ for n in range(num_jobs):
+ rendv.append("'%s#%d'" % (host_name, n))
+ rendvstr = ",".join(rendv)
+
+ if instance == 0:
+ # Do the setup and wait at the server barrier
+ # Clean up the tmp and the control dirs for the first instance
+ control_new.append('if os.path.exists(job.tmpdir):')
+ control_new.append("\t system('umount -f %s > /dev/null"
+ "2> /dev/null' % job.tmpdir,"
+ "ignore_status=True)")
+ control_new.append("\t system('rm -rf ' + job.tmpdir)")
+ control_new.append(
+ 'b0 = job.barrier("%s", "sc_bar", %d, port=%d)'
+ % (jobid, sc_bar_timeout, sc_bar_port))
+ control_new.append(
+ 'b0.rendezvous_servers("PARALLEL_MASTER", "%s")'
+ % jobid)
+
+ elif instance == 1:
+ # Wait at the server barrier to wait for instance=0
+ # process to complete setup
+ b0 = barrier.barrier("PARALLEL_MASTER", "sc_bar", sc_bar_timeout,
+ port=sc_bar_port)
+ b0.rendezvous_servers("PARALLEL_MASTER", jobid)
+
+ if(num_jobs > 2):
+ b1 = barrier.barrier(jobid, "s_bar", s_bar_timeout,
+ port=s_bar_port)
+ b1.rendezvous(rendvstr)
+
+ else:
+ # For the rest of the clients
+ b2 = barrier.barrier(jobid, "s_bar", s_bar_timeout, port=s_bar_port)
+ b2.rendezvous(rendvstr)
+
+ # Client side barrier for all the tests to start at the same time
+ control_new.append('b1 = job.barrier("%s", "c_bar", %d, port=%d)'
+ % (jobid, c_bar_timeout, c_bar_port))
+ control_new.append("b1.rendezvous(%s)" % rendvstr)
+
+ # Stick in the rest of the control file
+ control_new.append(control)
+
+ return "\n".join(control_new)
--- autotest/server/tests/barriertest/control.srv 2010-04-13
16:00:38.000000000 -0700
+++ autotest/server/tests/barriertest/control.srv 2010-04-13
18:15:53.000000000 -0700
@@ -11,7 +11,7 @@
events across multiple hosts.
"""
-from autotest_lib.client.common_lib import barrier
+from autotest_lib.client.common_lib import barrier, error
from autotest_lib.server import autotest, hosts, subcommand
if len(machines) > 3:
@@ -35,9 +35,27 @@
subcommand_list.append(subcommand.subcommand(
host_at.run, (control, host.hostname)))
-# Synchronize with all of the clients launched above from the autoserv host.
-final_barrier = barrier.barrier(master, 'Final', timeout=600)
-subcommand_list.append(subcommand.subcommand(
- final_barrier.rendezvous_servers, [master] + machines))
+listen_server = barrier.listen_server()
+aborting_barrier = barrier.barrier(master, 'WillAbortServers', timeout=600,
+ listen_server=listen_server)
+final_barrier = barrier.barrier(master, 'FinalSync', timeout=600,
+ listen_server=listen_server)
+def verify_server_barriers():
+ # Synchronize with all of the clients launched from the autoserv host.
+ final_barrier.rendezvous_servers(master, *machines)
+
+ try:
+ aborting_barrier.rendezvous_servers(master, abort=True, *machines)
+ except error.BarrierAbortError:
+ pass
+ except error.BarrierError, e:
+ # We did get an error from the barrier, but was is acceptable or
+ # not? Site code may not be able to indicate an explicit abort.
+ job.record('WARN', None, 'barriertest',
+ 'BarrierError %s instead of BarrierAbortError.' % e)
+ else:
+ raise error.TestFail('Explicit barrier rendezvous abort failed.')
+
+subcommand_list.append(subcommand.subcommand(verify_server_barriers, ()))
subcommand.parallel(subcommand_list)
--- autotest/utils/unittest_suite.py 2010-04-13 18:15:53.000000000 -0700
+++ autotest/utils/unittest_suite.py 2010-04-13 18:15:53.000000000 -0700
@@ -61,7 +61,7 @@
))
LONG_RUNTIME = set((
- 'barrier_unittest.py',
+ 'base_barrier_unittest.py',
'logging_manager_test.py',
))
_______________________________________________
Autotest mailing list
[email protected]
http://test.kernel.org/cgi-bin/mailman/listinfo/autotest