This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ac3090d7768 Checking modification timestamps only when newer_than
parameter is present (#48063)
ac3090d7768 is described below
commit ac3090d7768eafcefc680f6ad7de7daada4382e8
Author: Kacper Kulczak <[email protected]>
AuthorDate: Mon Mar 31 04:40:49 2025 +0200
Checking modification timestamps only when newer_than parameter is present
(#48063)
Checking modification timestamps has sagnificant impact on performance of
sensors in ase of large amount of files.
---
.../src/airflow/providers/sftp/sensors/sftp.py | 24 ++++++----
.../sftp/tests/unit/sftp/sensors/test_sftp.py | 54 +++++++++++++++-------
2 files changed, 53 insertions(+), 25 deletions(-)
diff --git a/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py
b/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py
index ca305098914..df4c7c5a611 100644
--- a/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py
+++ b/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py
@@ -95,18 +95,24 @@ class SFTPSensor(BaseSensorOperator):
else:
return False
else:
- actual_files_to_check = [self.path]
-
- for actual_file_to_check in actual_files_to_check:
try:
- mod_time = self.hook.get_mod_time(actual_file_to_check)
- self.log.info("Found File %s last modified: %s",
actual_file_to_check, mod_time)
+ self.hook.isfile(self.path)
+ actual_files_to_check = [self.path]
except OSError as e:
if e.errno != SFTP_NO_SUCH_FILE:
raise AirflowException from e
- continue
+ actual_files_to_check = []
+
+ if self.newer_than:
+ for actual_file_to_check in actual_files_to_check:
+ try:
+ mod_time = self.hook.get_mod_time(actual_file_to_check)
+ self.log.info("Found File %s last modified: %s",
actual_file_to_check, mod_time)
+ except OSError as e:
+ if e.errno != SFTP_NO_SUCH_FILE:
+ raise AirflowException from e
+ continue
- if self.newer_than:
if isinstance(self.newer_than, str):
self.newer_than = parse(self.newer_than)
_mod_time = convert_to_utc(datetime.strptime(mod_time,
"%Y%m%d%H%M%S"))
@@ -126,8 +132,8 @@ class SFTPSensor(BaseSensorOperator):
str(_mod_time),
str(_newer_than),
)
- else:
- files_found.append(actual_file_to_check)
+ else:
+ files_found = actual_files_to_check
if not len(files_found):
return False
diff --git a/providers/sftp/tests/unit/sftp/sensors/test_sftp.py
b/providers/sftp/tests/unit/sftp/sensors/test_sftp.py
index 8366a8817c6..644b876c367 100644
--- a/providers/sftp/tests/unit/sftp/sensors/test_sftp.py
+++ b/providers/sftp/tests/unit/sftp/sensors/test_sftp.py
@@ -19,7 +19,7 @@ from __future__ import annotations
from datetime import datetime, timezone as stdlib_timezone
from unittest import mock
-from unittest.mock import Mock, call, patch
+from unittest.mock import Mock, patch
import pytest
from paramiko.sftp import SFTP_FAILURE, SFTP_NO_SUCH_FILE
@@ -36,27 +36,27 @@ from airflow.sensors.base import PokeReturnValue
class TestSFTPSensor:
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_file_present(self, sftp_hook_mock):
- sftp_hook_mock.return_value.get_mod_time.return_value =
"19700101000000"
+ sftp_hook_mock.return_value.isfile.return_value = True
sftp_sensor = SFTPSensor(task_id="unit_test",
path="/path/to/file/1970-01-01.txt")
context = {"ds": "1970-01-01"}
output = sftp_sensor.poke(context)
-
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
+
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
sftp_hook_mock.return_value.close_conn.assert_not_called()
assert output
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_file_absent(self, sftp_hook_mock):
- sftp_hook_mock.return_value.get_mod_time.side_effect =
OSError(SFTP_NO_SUCH_FILE, "File missing")
+ sftp_hook_mock.return_value.isfile.side_effect =
OSError(SFTP_NO_SUCH_FILE, "File missing")
sftp_sensor = SFTPSensor(task_id="unit_test",
path="/path/to/file/1970-01-01.txt")
context = {"ds": "1970-01-01"}
output = sftp_sensor.poke(context)
-
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
+
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
sftp_hook_mock.return_value.close_conn.assert_not_called()
assert not output
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_sftp_failure(self, sftp_hook_mock):
- sftp_hook_mock.return_value.get_mod_time.side_effect =
OSError(SFTP_FAILURE, "SFTP failure")
+ sftp_hook_mock.return_value.isfile.side_effect = OSError(SFTP_FAILURE,
"SFTP failure")
sftp_sensor = SFTPSensor(task_id="unit_test",
path="/path/to/file/1970-01-01.txt")
context = {"ds": "1970-01-01"}
@@ -69,6 +69,7 @@ class TestSFTPSensor:
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_file_new_enough(self, sftp_hook_mock):
+ sftp_hook_mock.return_value.isfile.return_value = True
sftp_hook_mock.return_value.get_mod_time.return_value =
"19700101000000"
tz = timezone("America/Toronto")
sftp_sensor = SFTPSensor(
@@ -78,12 +79,14 @@ class TestSFTPSensor:
)
context = {"ds": "1970-01-00"}
output = sftp_sensor.poke(context)
+
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
sftp_hook_mock.return_value.close_conn.assert_not_called()
assert output
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_file_not_new_enough(self, sftp_hook_mock):
+ sftp_hook_mock.return_value.isfile.return_value = True
sftp_hook_mock.return_value.get_mod_time.return_value =
"19700101000000"
tz = timezone("Europe/Paris")
sftp_sensor = SFTPSensor(
@@ -93,6 +96,7 @@ class TestSFTPSensor:
)
context = {"ds": "1970-01-00"}
output = sftp_sensor.poke(context)
+
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
sftp_hook_mock.return_value.close_conn.assert_not_called()
assert not output
@@ -113,30 +117,29 @@ class TestSFTPSensor:
)
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_multiple_datetime_format_in_newer_than(self, sftp_hook_mock,
newer_than):
+ sftp_hook_mock.return_value.isfile.return_value = True
sftp_hook_mock.return_value.get_mod_time.return_value =
"19700101000000"
sftp_sensor = SFTPSensor(
task_id="unit_test", path="/path/to/file/1970-01-01.txt",
newer_than=newer_than
)
context = {"ds": "1970-01-00"}
output = sftp_sensor.poke(context)
+
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
sftp_hook_mock.return_value.close_conn.assert_not_called()
assert not output
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_file_present_with_pattern(self, sftp_hook_mock):
- sftp_hook_mock.return_value.get_mod_time.return_value =
"19700101000000"
sftp_hook_mock.return_value.get_files_by_pattern.return_value =
["text_file.txt"]
sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/",
file_pattern="*.txt")
context = {"ds": "1970-01-01"}
output = sftp_sensor.poke(context)
-
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/text_file.txt")
sftp_hook_mock.return_value.close_conn.assert_not_called()
assert output
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_file_not_present_with_pattern(self, sftp_hook_mock):
- sftp_hook_mock.return_value.get_mod_time.return_value =
"19700101000000"
sftp_hook_mock.return_value.get_files_by_pattern.return_value = []
sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/",
file_pattern="*.txt")
context = {"ds": "1970-01-01"}
@@ -146,7 +149,6 @@ class TestSFTPSensor:
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_multiple_files_present_with_pattern(self, sftp_hook_mock):
- sftp_hook_mock.return_value.get_mod_time.return_value =
"19700101000000"
sftp_hook_mock.return_value.get_files_by_pattern.return_value = [
"text_file.txt",
"another_text_file.txt",
@@ -154,10 +156,7 @@ class TestSFTPSensor:
sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/",
file_pattern="*.txt")
context = {"ds": "1970-01-01"}
output = sftp_sensor.poke(context)
- get_mod_time = sftp_hook_mock.return_value.get_mod_time
- expected_calls = [call("/path/to/file/text_file.txt"),
call("/path/to/file/another_text_file.txt")]
sftp_hook_mock.return_value.close_conn.assert_not_called()
- assert get_mod_time.mock_calls == expected_calls
assert output
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
@@ -227,7 +226,7 @@ class TestSFTPSensor:
)
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_file_path_present_with_callback(self, sftp_hook_mock, op_args,
op_kwargs):
- sftp_hook_mock.return_value.get_mod_time.return_value =
"19700101000000"
+ sftp_hook_mock.return_value.isfile.return_value = True
sample_callable = Mock()
sample_callable.return_value = ["sample_return"]
sftp_sensor = SFTPSensor(
@@ -240,7 +239,7 @@ class TestSFTPSensor:
context = {"ds": "1970-01-01"}
output = sftp_sensor.poke(context)
-
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
+
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
sftp_hook_mock.return_value.close_conn.assert_not_called()
sample_callable.assert_called_once_with(*op_args, **op_kwargs)
assert isinstance(output, PokeReturnValue)
@@ -259,7 +258,6 @@ class TestSFTPSensor:
)
@patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
def test_file_pattern_present_with_callback(self, sftp_hook_mock, op_args,
op_kwargs):
- sftp_hook_mock.return_value.get_mod_time.return_value =
"19700101000000"
sample_callable = Mock()
sample_callable.return_value = ["sample_return"]
sftp_hook_mock.return_value.get_files_by_pattern.return_value = [
@@ -285,3 +283,27 @@ class TestSFTPSensor:
"files_found": ["/path/to/file/text_file.txt",
"/path/to/file/another_text_file.txt"],
"decorator_return_value": ["sample_return"],
}
+
+ @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
+ def test_mod_time_called_when_newer_than_set(self, sftp_hook_mock):
+ sftp_hook_mock.return_value.isfile.return_value = True
+ sftp_hook_mock.return_value.get_mod_time.return_value =
"19700101000000"
+ tz = timezone("America/Toronto")
+ sftp_sensor = SFTPSensor(
+ task_id="unit_test",
+ path="/path/to/file/1970-01-01.txt",
+ newer_than=tz.convert(datetime(1960, 1, 2)),
+ )
+ context = {"ds": "1970-01-01"}
+ sftp_sensor.poke(context)
+
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
+
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
+
+ @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
+ def test_mod_time_not_called_when_newer_than_not_set(self, sftp_hook_mock):
+ sftp_hook_mock.return_value.isfile.return_value = True
+ sftp_hook_mock.return_value.get_mod_time.return_value =
"19700101000000"
+ sftp_sensor = SFTPSensor(task_id="unit_test",
path="/path/to/file/1970-01-01.txt")
+ context = {"ds": "1970-01-01"}
+ sftp_sensor.poke(context)
+ sftp_hook_mock.return_value.get_mod_time.assert_not_called()