uranusjr commented on a change in pull request #19711:
URL: https://github.com/apache/airflow/pull/19711#discussion_r755368679



##########
File path: airflow/providers/apache/hdfs/hooks/webhdfs.py
##########
@@ -71,30 +71,31 @@ def get_conn(self) -> Any:
 
     def _find_valid_server(self) -> Any:
         connection = self.get_connection(self.webhdfs_conn_id)
-        host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.log.info("Trying to connect to %s:%s", connection.host, 
connection.port)
-        try:
-            conn_check = host_socket.connect_ex((connection.host, 
connection.port))
-            if conn_check == 0:
-                self.log.info('Trying namenode %s', connection.host)
-                client = self._get_client(connection)
-                client.status('/')
-                self.log.info('Using namenode %s for hook', connection.host)
-                host_socket.close()
-                return client
-            else:
-                self.log.error("Could not connect to %s:%s", connection.host, 
connection.port)
-            host_socket.close()
-        except HdfsError as hdfs_error:
-            self.log.error('Read operation on namenode %s failed with error: 
%s', connection.host, hdfs_error)
+        namenodes = connection.host.split(',')
+        for namenode in namenodes:
+            host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            self.log.info("Trying to connect to %s:%s", namenode, 
connection.port)
+            try:
+                conn_check = host_socket.connect_ex((namenode, 
connection.port))
+                if conn_check == 0:
+                    self.log.info('Trying namenode %s', namenode)
+                    client = self._get_client(connection, namenode)
+                    client.status('/')
+                    self.log.info('Using namenode %s for hook', namenode)
+                    host_socket.close()
+                    return client
+                else:
+                    self.log.error("Could not connect to %s:%s", namenode, 
connection.port)
+            except HdfsError as hdfs_error:
+                self.log.error('Read operation on namenode %s failed with 
error: %s', namenode, hdfs_error)
         return None
 
-    def _get_client(self, connection: Connection) -> Any:
-        connection_str = f'http://{connection.host}:{connection.port}'
+    def _get_client(self, connection: Connection, namenode: str) -> Any:

Review comment:
       It may be better to just pass in `namenode`, `port`, and `extras` 
separately instead, to avoid `connection.host` being incorrectly used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to