Moved binding/listening on the server socket from 
barrier.barrier.run_server() to a separate class 
(barrier.listen_server). Added a keyword argument to barrier to pass 
a listen_server instance to use it when it needs access to the server 
socket. When a listen_server instance is not given to barrier it will 
create one in run_server() (preserving the old semantics). Updated 
barriertest (used by server side profiler support code) to reuse a 
listen_server instance in almost all its barrier instances (thus 
hopefully fixing an issue where we rebind too fast on the same port when 
packets are in transit from an old connection and get a ECONNRESET on a 
new connection because of this). Updated unittests.

Signed-off-by: Mihai Rusu <[email protected]>

--- autotest/client/common_lib/barrier.py       2010-03-15 16:03:28.000000000 
-0700
+++ autotest/client/common_lib/barrier.py       2010-03-15 16:03:28.000000000 
-0700
@@ -2,6 +2,50 @@
 from time import time, sleep
 from autotest_lib.client.common_lib import error
 
+# default barrier port
+_DEFAULT_PORT = 11922
+
+class listen_server(object):
+    """
+    Manages a listening socket for barrier.
+
+    Can be used to run multiple barrier instances with the same listening
+    socket (if they were going to listen on the same port).
+
+    Attributes:
+        @attribute address: address where to bind (string)
+        @attribute port: port where to bind
+        @attribute socket: listening socket object
+    """
+    def __init__(self, address='', port=_DEFAULT_PORT):
+        """Create a listen_server instance for the given address/port.
+
+        @param address: on what address to listen to
+        @param port: on what port to listen to
+        """
+        self.address = address
+        self.port = port
+        self.socket = self._setup()
+
+
+    def _setup(self):
+        """
+        Create, bind and listen on the listening socket.
+        """
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        sock.bind((self.address, self.port))
+        sock.listen(10)
+
+        return sock
+
+
+    def close(self):
+        """
+        Close the listening socket.
+        """
+        self.socket.close()
+
 
 class barrier(object):
     """ Multi-machine barrier support
@@ -95,12 +139,28 @@
                     Clients who have checked in and are waiting (master)
             masterid
                     Hostname/IP address + optional tag of selected master
+            server
+                    External listen server instance to use instead of creating
+                    our own. Prefer to create a listen_server instance and
+                    reuse it in multiple barrier instances so that the barrier
+                    code doesn't have to re-bind on the same port very fast
+                    (and if packets are in transit they may reset new
+                    connections).
     """
 
-    def __init__(self, hostid, tag, timeout=None, port=11922):
+    def __init__(self, hostid, tag, timeout=None, port=None,
+                 listen_server=None):
         self.hostid = hostid
         self.tag = tag
-        self.port = port
+        if listen_server:
+            if port:
+                raise error.BarrierError(
+                        'Received both "port" and "listen_server" arguments '
+                        '(provide only one of them)')
+            self.port = listen_server.port
+        else:
+            self.port = port or _DEFAULT_PORT
+        self.server = listen_server
         self.timeout = timeout
         self.start = None
         logging.info("tag=%s port=%d timeout=%r",
@@ -303,18 +363,14 @@
 
 
     def run_server(self, is_master):
-        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-        self.server.bind(('', self.port))
-        self.server.listen(10)
-
+        server = self.server or listen_server(port=self.port)
         failed = 0
         try:
             while 1:
                 try:
                     # Wait for callers welcoming each.
-                    self.server.settimeout(self.remaining())
-                    connection = self.server.accept()
+                    server.socket.settimeout(self.remaining())
+                    connection = server.socket.accept()
                     if is_master:
                         self.master_welcome(connection)
                     else:
@@ -336,13 +392,12 @@
                         logging.info("slave connected to master")
                         self.slave_wait()
                         break
-
-            self.waiting_close()
-            self.server.close()
-        except:
+        finally:
             self.waiting_close()
-            self.server.close()
-            raise
+            # if we created the listening_server in the beginning of this
+            # function then close the listening socket here
+            if not self.server:
+                server.close()
 
 
     def run_client(self, is_master):
--- autotest/client/common_lib/barrier_unittest.py      2010-03-15 
16:03:28.000000000 -0700
+++ autotest/client/common_lib/barrier_unittest.py      2010-03-15 
16:03:28.000000000 -0700
@@ -9,6 +9,23 @@
 from autotest_lib.client.common_lib.test_utils import mock
 
 
+class listen_server_test(unittest.TestCase):
+
+    def test_init(self):
+        server = barrier.listen_server()
+        server.close()
+
+
+    def test_close(self):
+        server = barrier.listen_server()
+        # cannot bind on the same port again
+        self.assertRaises(socket.error, barrier.listen_server)
+        server.close()
+        # now we can
+        server = barrier.listen_server()
+        server.close()
+
+
 class barrier_test(unittest.TestCase):
 
     def setUp(self):
@@ -137,9 +154,13 @@
     # Internal utility function (not a unit test)
     def rendezvous_test(self, timeout, port=11922,
                         rendezvous_servers=False, test_abort=False,
-                        abort=False):
+                        abort=False, listen_server=None):
+        if listen_server:
+            port = None
+
         def _rdv(addr):
-            b1 = barrier.barrier(addr, "test_meeting", timeout, port)
+            b1 = barrier.barrier(addr, "test_meeting", timeout, port,
+                                 listen_server=listen_server)
             if not rendezvous_servers:
                 if test_abort:
                     b1.rendezvous('127.0.0.1#0', '127.0.0.1#1', abort=abort)
@@ -158,8 +179,7 @@
             try:
                 _rdv(addr)
             except error.BarrierError:
-                if timeout == 0:
-                    pass
+                pass
 
         client = threading.Thread(target=_thread_rdv,
                                   args=('127.0.0.1#0',))
@@ -168,5 +188,15 @@
         client.join()
 
 
+    def test_reusing_listen_server(self):
+        """
+        Test that reusing the same listen server object works.
+        """
+        server = barrier.listen_server()
+        self.rendezvous_test(10, listen_server=server)
+        self.rendezvous_test(10, listen_server=server)
+        self.rendezvous_test(10, listen_server=server)
+
+
 if __name__ == "__main__":
     unittest.main()
--- autotest/client/tests/barriertest/barriertest.py    2010-03-15 
16:03:28.000000000 -0700
+++ autotest/client/tests/barriertest/barriertest.py    2010-03-15 
16:03:28.000000000 -0700
@@ -1,5 +1,6 @@
 import time
 from autotest_lib.client.bin import test
+from autotest_lib.client.common_lib import barrier
 
 class barriertest(test.test):
     version = 1
@@ -9,12 +10,13 @@
                     hostid, masterid, all_ids):
         profilers = self.job.profilers
 
+        barrier_server = barrier.listen_server(port=11920)
         b0 = self.job.barrier(hostid, "sync_profilers", timeout_start,
-                              port=11920)
+                              listen_server=barrier_server)
         b0.rendezvous_servers(masterid, hostid)
 
         b1 = self.job.barrier(hostid, "start_profilers", timeout_start,
-                              port=11920)
+                              listen_server=barrier_server)
         b1.rendezvous_servers(masterid, hostid)
 
         b2 = self.job.barrier(hostid, "local_sync_profilers", timeout_sync)
@@ -23,12 +25,14 @@
         profilers.start(self)
 
         b3 = self.job.barrier(hostid, "stop_profilers", timeout_stop,
-                              port=11920)
+                              listen_server=barrier_server)
         b3.rendezvous_servers(masterid, hostid)
 
         profilers.stop(self)
         profilers.report(self)
 
         b4 = self.job.barrier(hostid, "finish_profilers", timeout_stop,
-                              port=11920)
+                              listen_server=barrier_server)
         b4.rendezvous_servers(masterid, hostid)
+
+        barrier_server.close()
_______________________________________________
Autotest mailing list
[email protected]
http://test.kernel.org/cgi-bin/mailman/listinfo/autotest

Reply via email to