This is an automated email from the ASF dual-hosted git repository. aonishuk pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new 54ba891 AMBARI-23222. Ambari-agent fails to connect to server with two_way_auth enabled (aonishuk) 54ba891 is described below commit 54ba89133f61b8ad013d96f4f8d881844229abd9 Author: Andrew Onishuk <aonis...@hortonworks.com> AuthorDate: Tue Mar 13 20:56:07 2018 +0200 AMBARI-23222. Ambari-agent fails to connect to server with two_way_auth enabled (aonishuk) --- .../main/python/ambari_agent/HeartbeatThread.py | 3 +- .../src/main/python/ambari_agent/security.py | 94 ++++++++-------------- .../src/test/python/ambari_agent/TestSecurity.py | 53 ------------ .../main/python/ambari_stomp/adapter/websocket.py | 8 +- 4 files changed, 40 insertions(+), 118 deletions(-) diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index d559fac..2a4ae56 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -213,7 +213,8 @@ class HeartbeatThread(threading.Thread): Create a stomp connection """ connection_url = 'wss://{0}:{1}/agent/stomp/v1'.format(self.config.server_hostname, self.config.secured_url_port) - self.connection = security.establish_connection(connection_url) + connection_helper = security.VerifiedHTTPSConnection(self.config.server_hostname, connection_url, self.config) + self.connection = connection_helper.connect() def add_listeners(self): """ diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py index 5fb21b0..215608e 100644 --- a/ambari-agent/src/main/python/ambari_agent/security.py +++ b/ambari-agent/src/main/python/ambari_agent/security.py @@ -42,12 +42,12 @@ GEN_AGENT_KEY = 'openssl req -new -newkey rsa:1024 -nodes -keyout "%(keysdir)s' KEY_FILENAME = '%(hostname)s.key' -class VerifiedHTTPSConnection(httplib.HTTPSConnection): +class VerifiedHTTPSConnection: """ Connecting using ssl wrapped sockets """ - def __init__(self, host, port=None, config=None): - httplib.HTTPSConnection.__init__(self, host, port=port) + def __init__(self, host, connection_url, config): self.two_way_ssl_required = False self.host = host + self.connection_url = connection_url self.config = config def connect(self): @@ -57,11 +57,11 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection): logger.info( 'Server require two-way SSL authentication. Use it instead of one-way...') + logging.info("Connecting to {0}".format(self.connection_url)) + + if not self.two_way_ssl_required: - sock = self.create_connection() - self.sock = ssl.wrap_socket(sock, cert_reqs=ssl.CERT_NONE) - logger.info('SSL connection established. Two-way SSL authentication is ' - 'turned off on the server.') + conn = AmbariStompConnection(self.connection_url) else: self.certMan = CertificateManager(self.config, self.host) self.certMan.initSecurity() @@ -69,67 +69,41 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection): agent_crt = self.certMan.getAgentCrtName() server_crt = self.certMan.getSrvrCrtName() - sock = self.create_connection() + ssl_options = { + 'keyfile': agent_key, + 'certfile': agent_crt, + 'cert_reqs': ssl.CERT_REQUIRED, + 'ca_certs': server_crt + } - try: - self.sock = ssl.wrap_socket(sock, - keyfile=agent_key, - certfile=agent_crt, - cert_reqs=ssl.CERT_REQUIRED, - ca_certs=server_crt) - logger.info('SSL connection established. Two-way SSL authentication ' - 'completed successfully.') - except ssl.SSLError as err: - logger.error('Two-way SSL authentication failed. Ensure that ' - 'server and agent certificates were signed by the same CA ' - 'and restart the agent. ' - '\nIn order to receive a new agent certificate, remove ' - 'existing certificate file from keys directory. As a ' - 'workaround you can turn off two-way SSL authentication in ' - 'server configuration(ambari.properties) ' - '\nExiting..') - raise err - - def create_connection(self): - if self.sock: - self.sock.close() - logger.info("SSL Connect being called.. connecting to the server") - sock = socket.create_connection((self.host, self.port), 60) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - if self._tunnel_host: - self.sock = sock - self._tunnel() - - return sock - -def establish_connection(connection_url): - """ - Create a stomp connection - """ - logging.info("Connecting to {0}".format(connection_url)) - - conn = AmbariStompConnection(connection_url) - try: - conn.start() - conn.connect(wait=True) - except Exception as ex: - try: - conn.disconnect() - except: - logger.exception("Exception during conn.disconnect()") + conn = AmbariStompConnection(self.connection_url, ssl_options=ssl_options) - if isinstance(ex, socket_error): - logger.warn("Could not connect to {0}".format(connection_url)) + self.establish_connection(conn) + return conn + + def establish_connection(self, conn): + """ + Create a stomp connection + """ + try: + conn.start() + conn.connect(wait=True) + except Exception as ex: + try: + conn.disconnect() + except: + logger.exception("Exception during conn.disconnect()") - raise + if isinstance(ex, socket_error): + logger.warn("Could not connect to {0}. {1}".format(self.connection_url, str(ex))) - return conn + raise class AmbariStompConnection(WsConnection): - def __init__(self, url): + def __init__(self, *args, **kwargs): self.lock = threading.RLock() self.correlation_id = -1 - WsConnection.__init__(self, url) + WsConnection.__init__(self, *args, **kwargs) def send(self, destination, message, content_type=None, headers=None, **keyword_headers): with self.lock: diff --git a/ambari-agent/src/test/python/ambari_agent/TestSecurity.py b/ambari-agent/src/test/python/ambari_agent/TestSecurity.py index cd2a6c9..3387895 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestSecurity.py +++ b/ambari-agent/src/test/python/ambari_agent/TestSecurity.py @@ -59,59 +59,6 @@ class TestSecurity(unittest.TestCase): def tearDown(self): # enable stdout sys.stdout = sys.__stdout__ - - - ### VerifiedHTTPSConnection ### - - @patch.object(security.CertificateManager, "initSecurity") - @patch("socket.create_connection") - @patch("ssl.wrap_socket") - def test_VerifiedHTTPSConnection_connect(self, wrap_socket_mock, - create_connection_mock, - init_security_mock): - init_security_mock.return_value = None - self.config.set('security', 'keysdir', '/dummy-keysdir') - connection = security.VerifiedHTTPSConnection("example.com", - self.config.get('server', 'secured_url_port'), self.config) - connection._tunnel_host = False - connection.sock = None - connection.connect() - self.assertTrue(wrap_socket_mock.called) - - ### VerifiedHTTPSConnection with no certificates creation - @patch.object(security.CertificateManager, "initSecurity") - @patch("socket.create_connection") - @patch("ssl.wrap_socket") - def test_Verified_HTTPSConnection_non_secure_connect(self, wrap_socket_mock, - create_connection_mock, - init_security_mock): - connection = security.VerifiedHTTPSConnection("example.com", - self.config.get('server', 'secured_url_port'), self.config) - connection._tunnel_host = False - connection.sock = None - connection.connect() - self.assertFalse(init_security_mock.called) - - ### VerifiedHTTPSConnection with two-way SSL authentication enabled - @patch.object(security.CertificateManager, "initSecurity") - @patch("socket.create_connection") - @patch("ssl.wrap_socket") - def test_Verified_HTTPSConnection_two_way_ssl_connect(self, wrap_socket_mock, - create_connection_mock, - init_security_mock): - wrap_socket_mock.side_effect=ssl.SSLError() - connection = security.VerifiedHTTPSConnection("example.com", - self.config.get('server', 'secured_url_port'), self.config) - self.config.isTwoWaySSLConnection = MagicMock(return_value=True) - - connection._tunnel_host = False - connection.sock = None - try: - connection.connect() - except ssl.SSLError: - pass - self.assertTrue(init_security_mock.called) - ### CachedHTTPSConnection ### @patch.object(security.VerifiedHTTPSConnection, "connect") diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py index 6cf19db..ad61866 100644 --- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py +++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py @@ -64,11 +64,11 @@ class QueuedWebSocketClient(WebSocketClient): self.messages.put(StopIteration) class WsTransport(Transport): - def __init__(self, url): + def __init__(self, url, ssl_options=None): Transport.__init__(self, (0, 0), False, False, 0.0, 0.0, 0.0, 0.0, 0, False, None, None, None, None, False, DEFAULT_SSL_VERSION, None, None, None) self.current_host_and_port = (0, 0) # mocking - self.ws = QueuedWebSocketClient(url, protocols=['http-only', 'chat']) + self.ws = QueuedWebSocketClient(url, protocols=['http-only', 'chat'], ssl_options=ssl_options) self.ws.daemon = False def wait_for_connection(self, timeout=DEFAULT_CONNECTION_TIMEOUT): @@ -124,8 +124,8 @@ class WsTransport(Transport): logger.exception("Exception during Transport.stop(self)") class WsConnection(BaseConnection, Protocol12): - def __init__(self, url): - self.transport = WsTransport(url) + def __init__(self, url, ssl_options=None): + self.transport = WsTransport(url, ssl_options=ssl_options) self.transport.set_listener('ws-listener', self) self.transactions = {} Protocol12.__init__(self, self.transport, (0, 0)) -- To stop receiving notification emails like this one, please contact aonis...@apache.org.