ashb commented on a change in pull request #7454: [AIRFLOW-6833] HA for webhdfs
connection
URL: https://github.com/apache/airflow/pull/7454#discussion_r384471488
##########
File path: airflow/providers/apache/hdfs/hooks/webhdfs.py
##########
@@ -56,27 +57,34 @@ def __init__(self, webhdfs_conn_id='webhdfs_default',
proxy_user=None):
def get_conn(self):
"""
Establishes a connection depending on the security mode set via config
or environment variable.
-
:return: a hdfscli InsecureClient or KerberosClient object.
:rtype: hdfs.InsecureClient or hdfs.ext.kerberos.KerberosClient
"""
- connections = self.get_connections(self.webhdfs_conn_id)
+ connection = self._find_valid_server()
+ if connection is None:
+ raise AirflowWebHDFSHookException("Failed to locate the valid
server.")
+ return connection
+ def _find_valid_server(self):
+ connections = self.get_connections(self.webhdfs_conn_id)
for connection in connections:
+ host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.log.info("Trying to connect to %s:%s", connection.host,
connection.port)
try:
- self.log.debug('Trying namenode %s', connection.host)
- client = self._get_client(connection)
- client.status('/')
- self.log.debug('Using namenode %s for hook', connection.host)
- return client
+ conn_check = host_socket.connect_ex((connection.host,
connection.port))
+ if conn_check == 0:
+ self.log.info('Trying namenode %s', connection.host)
+ client = self._get_client(connection)
+ client.status('/')
+ self.log.info('Using namenode %s for hook',
connection.host)
+ host_socket.close()
+ return client
+ else:
+ self.log.info("Could not connect to %s:%s",
connection.host, connection.port)
except HdfsError as hdfs_error:
- self.log.debug('Read operation on namenode %s failed with
error: %s',
- connection.host, hdfs_error)
-
- hosts = [connection.host for connection in connections]
- error_message = 'Read operations failed on the namenodes
below:\n{hosts}'.format(
- hosts='\n'.join(hosts))
- raise AirflowWebHDFSHookException(error_message)
+ self.log.info('Read operation on namenode %s failed with
error: %s',
+ connection.host, hdfs_error)
+ return None
Review comment:
You are failing to close the `host_socket` if no connections succeed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services