kaxil commented on a change in pull request #7454: [AIRFLOW-6833] HA for 
webhdfs connection
URL: https://github.com/apache/airflow/pull/7454#discussion_r397996727
 
 

 ##########
 File path: airflow/providers/apache/hdfs/hooks/webhdfs.py
 ##########
 @@ -56,27 +57,35 @@ def __init__(self, webhdfs_conn_id='webhdfs_default', 
proxy_user=None):
     def get_conn(self):
         """
         Establishes a connection depending on the security mode set via config 
or environment variable.
-
         :return: a hdfscli InsecureClient or KerberosClient object.
         :rtype: hdfs.InsecureClient or hdfs.ext.kerberos.KerberosClient
         """
-        connections = self.get_connections(self.webhdfs_conn_id)
+        connection = self._find_valid_server()
+        if connection is None:
+            raise AirflowWebHDFSHookException("Failed to locate the valid 
server.")
+        return connection
 
+    def _find_valid_server(self):
+        connections = self.get_connections(self.webhdfs_conn_id)
         for connection in connections:
+            host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.log.info("Trying to connect to %s:%s", connection.host, 
connection.port)
             try:
-                self.log.debug('Trying namenode %s', connection.host)
-                client = self._get_client(connection)
-                client.status('/')
-                self.log.debug('Using namenode %s for hook', connection.host)
-                return client
+                conn_check = host_socket.connect_ex((connection.host, 
connection.port))
+                if conn_check == 0:
+                    self.log.info('Trying namenode %s', connection.host)
+                    client = self._get_client(connection)
+                    client.status('/')
+                    self.log.info('Using namenode %s for hook', 
connection.host)
+                    host_socket.close()
+                    return client
+                else:
+                    self.log.info("Could not connect to %s:%s", 
connection.host, connection.port)
+                host_socket.close()
             except HdfsError as hdfs_error:
-                self.log.debug('Read operation on namenode %s failed with 
error: %s',
-                               connection.host, hdfs_error)
-
-        hosts = [connection.host for connection in connections]
-        error_message = 'Read operations failed on the namenodes 
below:\n{hosts}'.format(
-            hosts='\n'.join(hosts))
-        raise AirflowWebHDFSHookException(error_message)
+                self.log.info('Read operation on namenode %s failed with 
error: %s',
 
 Review comment:
   Got it

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to