This is an automated email from the ASF dual-hosted git repository.
pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f5c8059553 sftp_sensor: fixing resource management with sensor (#40022)
f5c8059553 is described below
commit f5c80595532299f31d911823a64c9730a838b4d7
Author: Ashish Patel <[email protected]>
AuthorDate: Fri Jun 7 18:24:32 2024 +0530
sftp_sensor: fixing resource management with sensor (#40022)
closes: #39922
Summary
When a user tries to use the SFTPSensor operator with deferrable=True,
using path/newer_than, it will open a connection and remain open, the reason is
because of method get_mod_time in opening a sftp connection but not closing it
afterward.
As part of this change, we are closing the connection.
---
airflow/providers/sftp/hooks/sftp.py | 10 +++++++---
airflow/providers/sftp/sensors/sftp.py | 13 +++++++++++++
2 files changed, 20 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/sftp/hooks/sftp.py
b/airflow/providers/sftp/hooks/sftp.py
index 0907fba2eb..7a2e34a215 100644
--- a/airflow/providers/sftp/hooks/sftp.py
+++ b/airflow/providers/sftp/hooks/sftp.py
@@ -549,7 +549,7 @@ class SFTPHookAsync(BaseHook):
matched_files = [file for file in files_list if
fnmatch(str(file.filename), fnmatch_pattern)]
return matched_files
- async def get_mod_time(self, path: str) -> str:
+ async def get_mod_time(self, path: str) -> str: # type: ignore[return]
"""
Make SFTP async connection.
@@ -558,9 +558,10 @@ class SFTPHookAsync(BaseHook):
:param path: full path to the remote file
"""
- ssh_conn = await self._get_conn()
- sftp_client = await ssh_conn.start_sftp_client()
+ ssh_conn = None
try:
+ ssh_conn = await self._get_conn()
+ sftp_client = await ssh_conn.start_sftp_client()
ftp_mdtm = await sftp_client.stat(path)
modified_time = ftp_mdtm.mtime
mod_time =
datetime.datetime.fromtimestamp(modified_time).strftime("%Y%m%d%H%M%S") #
type: ignore[arg-type]
@@ -568,3 +569,6 @@ class SFTPHookAsync(BaseHook):
return mod_time
except asyncssh.SFTPNoSuchFile:
raise AirflowException("No files matching")
+ finally:
+ if ssh_conn:
+ ssh_conn.close()
diff --git a/airflow/providers/sftp/sensors/sftp.py
b/airflow/providers/sftp/sensors/sftp.py
index de3870937d..f56ad93410 100644
--- a/airflow/providers/sftp/sensors/sftp.py
+++ b/airflow/providers/sftp/sensors/sftp.py
@@ -111,6 +111,19 @@ class SFTPSensor(BaseSensorOperator):
_newer_than = convert_to_utc(self.newer_than)
if _newer_than <= _mod_time:
files_found.append(actual_file_to_check)
+ self.log.info(
+ "File %s has modification time: '%s', which is newer
than: '%s'",
+ actual_file_to_check,
+ str(_mod_time),
+ str(_newer_than),
+ )
+ else:
+ self.log.info(
+ "File %s has modification time: '%s', which is older
than: '%s'",
+ actual_file_to_check,
+ str(_mod_time),
+ str(_newer_than),
+ )
else:
files_found.append(actual_file_to_check)