kaxil commented on a change in pull request #7454: [AIRFLOW-6833] HA for 
webhdfs connection
URL: https://github.com/apache/airflow/pull/7454#discussion_r397873922

 File path: airflow/providers/apache/hdfs/hooks/webhdfs.py
 @@ -56,27 +57,35 @@ def __init__(self, webhdfs_conn_id='webhdfs_default', 
     def get_conn(self):
         Establishes a connection depending on the security mode set via config 
or environment variable.
         :return: a hdfscli InsecureClient or KerberosClient object.
         :rtype: hdfs.InsecureClient or hdfs.ext.kerberos.KerberosClient
-        connections = self.get_connections(self.webhdfs_conn_id)
+        connection = self._find_valid_server()
+        if connection is None:
+            raise AirflowWebHDFSHookException("Failed to locate the valid 
+        return connection
+    def _find_valid_server(self):
+        connections = self.get_connections(self.webhdfs_conn_id)
         for connection in connections:
+            host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.log.info("Trying to connect to %s:%s", connection.host, 
-                self.log.debug('Trying namenode %s', connection.host)
-                client = self._get_client(connection)
-                client.status('/')
-                self.log.debug('Using namenode %s for hook', connection.host)
-                return client
+                conn_check = host_socket.connect_ex((connection.host, 
+                if conn_check == 0:
+                    self.log.info('Trying namenode %s', connection.host)
+                    client = self._get_client(connection)
+                    client.status('/')
+                    self.log.info('Using namenode %s for hook', 
+                    host_socket.close()
+                    return client
+                else:
+                    self.log.info("Could not connect to %s:%s", 
connection.host, connection.port)
+                host_socket.close()
             except HdfsError as hdfs_error:
-                self.log.debug('Read operation on namenode %s failed with 
error: %s',
-                               connection.host, hdfs_error)
-        hosts = [connection.host for connection in connections]
-        error_message = 'Read operations failed on the namenodes 
-            hosts='\n'.join(hosts))
-        raise AirflowWebHDFSHookException(error_message)
+                self.log.info('Read operation on namenode %s failed with 
error: %s',
 Review comment:
   Wait, if there is an HDFS error, doesn't the host_socket need to closed? 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services

Reply via email to