pankajastro commented on code in PR #24550:
URL: https://github.com/apache/airflow/pull/24550#discussion_r905112068


##########
airflow/providers/apache/hdfs/hooks/webhdfs.py:
##########
@@ -63,39 +63,51 @@ def get_conn(self) -> Any:
         """
         connection = self._find_valid_server()
         if connection is None:
-            raise AirflowWebHDFSHookException("Failed to locate the valid 
server.")
+            raise AirflowWebHDFSHookException(
+                "Failed to locate the valid server.")
         return connection
 
     def _find_valid_server(self) -> Any:
         connection = self.get_connection(self.webhdfs_conn_id)
         namenodes = connection.host.split(',')
         for namenode in namenodes:
             host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-            self.log.info("Trying to connect to %s:%s", namenode, 
connection.port)
+            self.log.info("Trying to connect to %s:%s",
+                          namenode, connection.port)
             try:
-                conn_check = host_socket.connect_ex((namenode, 
connection.port))
+                conn_check = host_socket.connect_ex(
+                    (namenode, connection.port))
                 if conn_check == 0:
                     self.log.info('Trying namenode %s', namenode)
                     client = self._get_client(
-                        namenode, connection.port, connection.login, 
connection.extra_dejson
+                        namenode, connection.port, connection.login, 
connection.schema,
+                        connection.extra_dejson
                     )
                     client.status('/')
                     self.log.info('Using namenode %s for hook', namenode)
                     host_socket.close()
                     return client
                 else:
-                    self.log.warning("Could not connect to %s:%s", namenode, 
connection.port)
+                    self.log.warning("Could not connect to %s:%s",
+                                     namenode, connection.port)
             except HdfsError as hdfs_error:
-                self.log.info('Read operation on namenode %s failed with 
error: %s', namenode, hdfs_error)
+                self.log.info(
+                    'Read operation on namenode %s failed with error: %s', 
namenode, hdfs_error)
         return None
 
-    def _get_client(self, namenode: str, port: int, login: str, extra_dejson: 
dict) -> Any:
-        connection_str = f'http://{namenode}:{port}'
+    def _get_client(self, namenode: str, port: int, login: str, schema: str, 
extra_dejson: dict) -> Any:
+        connection_str = f'http://{namenode}'
         session = requests.Session()
 
-        if extra_dejson.get('use_ssl', False):
-            connection_str = f'https://{namenode}:{port}'
-            session.verify = extra_dejson.get('verify', True)
+        if extra_dejson.get('use_ssl', 'False') == 'True':

Review Comment:
   I think connection extra can be both strings as well as dict/JSON  so not 
sure, actually, this change requires or not 
https://github.com/apache/airflow/blob/main/airflow/models/connection.py#L119 .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to