URL: https://github.com/freeipa/freeipa/pull/267
Author: tomaskrizek
 Title: #267: ipa-replica-conncheck: do not close listening ports until required
Action: synchronized

To pull the PR as Git branch:
git remote add ghfreeipa https://github.com/freeipa/freeipa
git fetch ghfreeipa pull/267/head:pr267
git checkout pr267
From cbb58aa8cb695ea8529001ebca303858927a73af Mon Sep 17 00:00:00 2001
From: Tomas Krizek <tkri...@redhat.com>
Date: Wed, 23 Nov 2016 13:55:14 +0100
Subject: [PATCH] ipa-replica-conncheck: do not close listening ports until
 required

Previously, a separate thread would be created for each socket used
for conncheck. It would also time out after one second, after which it
would be closed and reopened again. This caused random failures of
conncheck.

Now all sockets are handled in a single thread and once the server
starts to listen on a port, it does not close that connection until the
script finishes.

Only IPv6 socket is used for simplicity, since it can handle both IPv6
and IPv4 connections. This requires IPv6 kernel support, which is
required by other parts of IPA anyway.

https://fedorahosted.org/freeipa/ticket/6487
---
 install/tools/ipa-replica-conncheck | 151 +++++++++++++++++++++++++++---------
 ipapython/ipautil.py                |  71 -----------------
 2 files changed, 113 insertions(+), 109 deletions(-)

diff --git a/install/tools/ipa-replica-conncheck b/install/tools/ipa-replica-conncheck
index 7ec1ef8..5aae284 100755
--- a/install/tools/ipa-replica-conncheck
+++ b/install/tools/ipa-replica-conncheck
@@ -31,14 +31,16 @@ import ipaclient.ipachangeconf
 from optparse import OptionGroup, OptionValueError
 # pylint: enable=deprecated-module
 from ipapython.ipa_log_manager import root_logger, standard_logging_setup
+import copy
 import sys
 import os
 import signal
 import tempfile
+import select
 import socket
 import time
 import threading
-import errno
+import traceback
 from socket import SOCK_STREAM, SOCK_DGRAM
 import distutils.spawn
 from ipaplatform.paths import paths
@@ -46,11 +48,12 @@ import gssapi
 from cryptography.hazmat.primitives import serialization
 
 CONNECT_TIMEOUT = 5
-RESPONDERS = [ ]
+RESPONDER = None
 QUIET = False
 CCACHE_FILE = None
 KRB5_CONFIG = None
 
+
 class SshExec(object):
     def __init__(self, user, addr):
         self.user = user
@@ -96,6 +99,7 @@ class CheckedPort(object):
         self.port_type = port_type
         self.description = description
 
+
 BASE_PORTS = [
                 CheckedPort(389, SOCK_STREAM, "Directory Service: Unsecure port"),
                 CheckedPort(636, SOCK_STREAM, "Directory Service: Secure port"),
@@ -112,6 +116,7 @@ def print_info(msg):
     if not QUIET:
         print(msg)
 
+
 def parse_options():
     def ca_cert_file_callback(option, opt, value, parser):
         if not os.path.exists(value):
@@ -211,6 +216,7 @@ def parse_options():
 
     return safe_options, options
 
+
 def logging_setup(options):
     log_file = None
 
@@ -219,16 +225,6 @@ def logging_setup(options):
 
     standard_logging_setup(log_file, debug=options.debug)
 
-def clean_responders(responders):
-    if not responders:
-        return
-
-    for responder in responders:
-        responder.stop()
-
-    for responder in responders:
-        responder.join()
-        responders.remove(responder)
 
 def sigterm_handler(signum, frame):
     # do what SIGINT does (raise a KeyboardInterrupt)
@@ -236,6 +232,7 @@ def sigterm_handler(signum, frame):
     if callable(sigint_handler):
         sigint_handler(signum, frame)
 
+
 def configure_krb5_conf(realm, kdc, filename):
 
     krbconf = ipaclient.ipachangeconf.IPAChangeConf("IPA Installer")
@@ -283,32 +280,107 @@ def configure_krb5_conf(realm, kdc, filename):
 
     krbconf.newConf(filename, opts)
 
+
 class PortResponder(threading.Thread):
 
-    def __init__(self, port, port_type, socket_timeout=1):
+    PROTO = {socket.SOCK_STREAM: 'tcp',
+             socket.SOCK_DGRAM: 'udp'}
+
+    def __init__(self, ports):
+        """
+        ports: a list of CheckedPort
+        """
         super(PortResponder, self).__init__()
-        self.port = port
-        self.port_type = port_type
-        self.socket_timeout = socket_timeout
-        self._stop_request = False
+        # copy ports to avoid the need to synchronize it between threads
+        self.ports = copy.deepcopy(ports)
+        self._sockets = []
+        self._close = False
+        self._close_lock = threading.Lock()
+        self.responder_data = 'FreeIPA'
+        self.ports_open = threading.Condition()
 
     def run(self):
-        while not self._stop_request:
+        root_logger.debug('Starting listening thread.')
+
+        for port in self.ports:
+            self._bind_to_port(port.port, port.port_type)
+        with self.ports_open:
+            root_logger.debug('Ports opened, notify original thread')
+            self.ports_open.notify()
+
+        while not self._is_closing():
+            ready_socks, _socks1, _socks2 = select.select(
+                self._sockets, [], [], 1)
+            if ready_socks:
+                ready_sock = ready_socks[0]
+                self._respond(ready_sock)
+
+        for sock in self._sockets:
+            port = sock.getsockname()[1]
+            proto = PortResponder.PROTO[sock.type]
+            sock.close()
+            root_logger.debug('%(port)d %(proto)s: Stopped listening' %
+                              dict(port=port, proto=proto))
+
+    def _is_closing(self):
+        with self._close_lock:
+            return self._close
+
+    def _bind_to_port(self, port, socket_type):
+        # Use IPv6 socket as it is able to accept both IPv6 and IPv4
+        # connections. Since IPv6 kernel module is required by other
+        # parts of IPA, it should always be available.
+        family = socket.AF_INET6
+        host = '::'   # all available interfaces
+        proto = PortResponder.PROTO[socket_type]
+
+        try:
+            sock = socket.socket(family, socket_type)
+
+            # Make sure IPv4 clients can connect to IPv6 socket
+            sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
+
+            if socket_type == socket.SOCK_STREAM:
+                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+            sock.bind((host, port))
+            if socket_type == socket.SOCK_STREAM:
+                # There might be a delay before accepting the connection,
+                # because a single thread is used to handle all the
+                # connections. Thus a backlog size of at least 1 is needed.
+                sock.listen(1)
+
+            root_logger.debug('%(port)d %(proto)s: Started listening' %
+                              dict(port=port, proto=proto))
+        except socket.error as e:
+            root_logger.warning('%(port)d %(proto)s: Failed to bind' %
+                                dict(port=port, proto=proto))
+            root_logger.debug(traceback.format_exc(e))
+        else:
+            self._sockets.append(sock)
+
+    def _respond(self, sock):
+        port = sock.getsockname()[1]
+        if sock.type == socket.SOCK_STREAM:
+            connection, addr = sock.accept()
             try:
-                ipautil.bind_port_responder(self.port,
-                        self.port_type,
-                        socket_timeout=self.socket_timeout,
-                        responder_data="FreeIPA")
-            except socket.timeout:
-                pass
-            except socket.error as e:
-                if e.errno == errno.EADDRINUSE:
-                    time.sleep(1)
-                else:
-                    raise
+                connection.sendall(self.responder_data)
+                root_logger.debug('%(port)d tcp: Responded to %(addr)s' %
+                                  dict(port=port, addr=addr[0]))
+            finally:
+                connection.close()
+        elif sock.type == socket.SOCK_DGRAM:
+            _data, addr = sock.recvfrom(1)
+            sock.sendto(self.responder_data, addr)
+            root_logger.debug('%(port)d udp: Responded to %(addr)s' %
+                              dict(port=port, addr=addr[0]))
 
     def stop(self):
-        self._stop_request = True
+        root_logger.debug('Stopping listening thread.')
+
+        with self._close_lock:
+            self._close = True
+
 
 def port_check(host, port_list):
     ports_failed = []
@@ -344,7 +416,9 @@ def port_check(host, port_list):
         raise RuntimeError("Port check failed! Inaccessible port(s): %s" \
                 % ", ".join(msg_ports))
 
+
 def main():
+    global RESPONDER
     safe_options, options = parse_options()
 
     logging_setup(options)
@@ -386,11 +460,11 @@ def main():
         # create listeners
         print_info("Start listening on required ports for remote master check")
 
-        for port in required_ports:
-            root_logger.debug("Start listening on port %d (%s)" % (port.port, port.description))
-            responder = PortResponder(port.port, port.port_type)
-            responder.start()
-            RESPONDERS.append(responder)
+        RESPONDER = PortResponder(required_ports)
+        RESPONDER.start()
+        with RESPONDER.ports_open:
+            RESPONDER.ports_open.wait()
+            root_logger.debug('Original thread resumed')
 
         remote_check_opts = ['--replica %s' % options.hostname]
 
@@ -550,18 +624,19 @@ def main():
             time.sleep(3600)
             print_info("Connection check timeout: terminating listening program")
 
+
 if __name__ == "__main__":
     try:
         sys.exit(main())
-    except SystemExit as e:
-        sys.exit(e)
     except KeyboardInterrupt:
         print_info("\nCleaning up...")
         sys.exit(1)
     except RuntimeError as e:
         sys.exit(e)
     finally:
-        clean_responders(RESPONDERS)
+        if RESPONDER is not None:
+            RESPONDER.stop()
+            RESPONDER.join()
         for file_name in (CCACHE_FILE, KRB5_CONFIG):
             if file_name:
                 try:
diff --git a/ipapython/ipautil.py b/ipapython/ipautil.py
index 472ba35..a1be569 100644
--- a/ipapython/ipautil.py
+++ b/ipapython/ipautil.py
@@ -981,77 +981,6 @@ def host_port_open(host, port, socket_type=socket.SOCK_STREAM, socket_timeout=No
 
     return False
 
-def bind_port_responder(port, socket_type=socket.SOCK_STREAM, socket_timeout=None, responder_data=None):
-    host = None   # all available interfaces
-    last_socket_error = None
-
-    # At first try to create IPv6 socket as it is able to accept both IPv6 and
-    # IPv4 connections (when not turned off)
-    families = (socket.AF_INET6, socket.AF_INET)
-    s = None
-
-    for family in families:
-        try:
-            addr_infos = socket.getaddrinfo(host, port, family, socket_type, 0,
-                            socket.AI_PASSIVE)
-        except socket.error as e:
-            last_socket_error = e
-            continue
-        for res in addr_infos:
-            af, socktype, proto, _canonname, sa = res
-            try:
-                s = socket.socket(af, socktype, proto)
-            except socket.error as e:
-                last_socket_error = e
-                s = None
-                continue
-
-            if socket_timeout is not None:
-                s.settimeout(1)
-
-            if af == socket.AF_INET6:
-                try:
-                    # Make sure IPv4 clients can connect to IPv6 socket
-                    s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0)
-                except socket.error:
-                    pass
-
-            if socket_type == socket.SOCK_STREAM:
-                s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
-            try:
-                s.bind(sa)
-
-                while True:
-                    if socket_type == socket.SOCK_STREAM:
-                        s.listen(1)
-                        connection, _client_address = s.accept()
-                        try:
-                            if responder_data:
-                                connection.sendall(responder_data)
-                        finally:
-                            connection.close()
-                    elif socket_type == socket.SOCK_DGRAM:
-                        _data, addr = s.recvfrom(1)
-
-                        if responder_data:
-                            s.sendto(responder_data, addr)
-            except socket.timeout:
-                # Timeout is expectable as it was requested by caller, raise
-                # the exception back to him
-                raise
-            except socket.error as e:
-                last_socket_error = e
-                s.close()
-                s = None
-                continue
-            finally:
-                if s:
-                    s.close()
-
-    if s is None and last_socket_error is not None:
-        raise last_socket_error # pylint: disable=E0702
-
 
 def reverse_record_exists(ip_address):
     """
-- 
Manage your subscription for the Freeipa-devel mailing list:
https://www.redhat.com/mailman/listinfo/freeipa-devel
Contribute to FreeIPA: http://www.freeipa.org/page/Contribute/Code

Reply via email to