pankajastro commented on code in PR #24550:
URL: https://github.com/apache/airflow/pull/24550#discussion_r905112068
##########
airflow/providers/apache/hdfs/hooks/webhdfs.py:
##########
@@ -63,39 +63,51 @@ def get_conn(self) -> Any:
"""
connection = self._find_valid_server()
if connection is None:
- raise AirflowWebHDFSHookException("Failed to locate the valid
server.")
+ raise AirflowWebHDFSHookException(
+ "Failed to locate the valid server.")
return connection
def _find_valid_server(self) -> Any:
connection = self.get_connection(self.webhdfs_conn_id)
namenodes = connection.host.split(',')
for namenode in namenodes:
host_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.log.info("Trying to connect to %s:%s", namenode,
connection.port)
+ self.log.info("Trying to connect to %s:%s",
+ namenode, connection.port)
try:
- conn_check = host_socket.connect_ex((namenode,
connection.port))
+ conn_check = host_socket.connect_ex(
+ (namenode, connection.port))
if conn_check == 0:
self.log.info('Trying namenode %s', namenode)
client = self._get_client(
- namenode, connection.port, connection.login,
connection.extra_dejson
+ namenode, connection.port, connection.login,
connection.schema,
+ connection.extra_dejson
)
client.status('/')
self.log.info('Using namenode %s for hook', namenode)
host_socket.close()
return client
else:
- self.log.warning("Could not connect to %s:%s", namenode,
connection.port)
+ self.log.warning("Could not connect to %s:%s",
+ namenode, connection.port)
except HdfsError as hdfs_error:
- self.log.info('Read operation on namenode %s failed with
error: %s', namenode, hdfs_error)
+ self.log.info(
+ 'Read operation on namenode %s failed with error: %s',
namenode, hdfs_error)
return None
- def _get_client(self, namenode: str, port: int, login: str, extra_dejson:
dict) -> Any:
- connection_str = f'http://{namenode}:{port}'
+ def _get_client(self, namenode: str, port: int, login: str, schema: str,
extra_dejson: dict) -> Any:
+ connection_str = f'http://{namenode}'
session = requests.Session()
- if extra_dejson.get('use_ssl', False):
- connection_str = f'https://{namenode}:{port}'
- session.verify = extra_dejson.get('verify', True)
+ if extra_dejson.get('use_ssl', 'False') == 'True':
Review Comment:
I think connection extra can be both strings as well as dict/JSON so not
sure, actually, this change requires or not
https://github.com/apache/airflow/blob/main/airflow/models/connection.py#L119 .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]