This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ac3090d7768 Checking modification timestamps only when newer_than 
parameter is present (#48063)
ac3090d7768 is described below

commit ac3090d7768eafcefc680f6ad7de7daada4382e8
Author: Kacper Kulczak <[email protected]>
AuthorDate: Mon Mar 31 04:40:49 2025 +0200

    Checking modification timestamps only when newer_than parameter is present 
(#48063)
    
    Checking modification timestamps has sagnificant impact on performance of 
sensors in ase of large amount of files.
---
 .../src/airflow/providers/sftp/sensors/sftp.py     | 24 ++++++----
 .../sftp/tests/unit/sftp/sensors/test_sftp.py      | 54 +++++++++++++++-------
 2 files changed, 53 insertions(+), 25 deletions(-)

diff --git a/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py 
b/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py
index ca305098914..df4c7c5a611 100644
--- a/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py
+++ b/providers/sftp/src/airflow/providers/sftp/sensors/sftp.py
@@ -95,18 +95,24 @@ class SFTPSensor(BaseSensorOperator):
             else:
                 return False
         else:
-            actual_files_to_check = [self.path]
-
-        for actual_file_to_check in actual_files_to_check:
             try:
-                mod_time = self.hook.get_mod_time(actual_file_to_check)
-                self.log.info("Found File %s last modified: %s", 
actual_file_to_check, mod_time)
+                self.hook.isfile(self.path)
+                actual_files_to_check = [self.path]
             except OSError as e:
                 if e.errno != SFTP_NO_SUCH_FILE:
                     raise AirflowException from e
-                continue
+                actual_files_to_check = []
+
+        if self.newer_than:
+            for actual_file_to_check in actual_files_to_check:
+                try:
+                    mod_time = self.hook.get_mod_time(actual_file_to_check)
+                    self.log.info("Found File %s last modified: %s", 
actual_file_to_check, mod_time)
+                except OSError as e:
+                    if e.errno != SFTP_NO_SUCH_FILE:
+                        raise AirflowException from e
+                    continue
 
-            if self.newer_than:
                 if isinstance(self.newer_than, str):
                     self.newer_than = parse(self.newer_than)
                 _mod_time = convert_to_utc(datetime.strptime(mod_time, 
"%Y%m%d%H%M%S"))
@@ -126,8 +132,8 @@ class SFTPSensor(BaseSensorOperator):
                         str(_mod_time),
                         str(_newer_than),
                     )
-            else:
-                files_found.append(actual_file_to_check)
+        else:
+            files_found = actual_files_to_check
 
         if not len(files_found):
             return False
diff --git a/providers/sftp/tests/unit/sftp/sensors/test_sftp.py 
b/providers/sftp/tests/unit/sftp/sensors/test_sftp.py
index 8366a8817c6..644b876c367 100644
--- a/providers/sftp/tests/unit/sftp/sensors/test_sftp.py
+++ b/providers/sftp/tests/unit/sftp/sensors/test_sftp.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 
 from datetime import datetime, timezone as stdlib_timezone
 from unittest import mock
-from unittest.mock import Mock, call, patch
+from unittest.mock import Mock, patch
 
 import pytest
 from paramiko.sftp import SFTP_FAILURE, SFTP_NO_SUCH_FILE
@@ -36,27 +36,27 @@ from airflow.sensors.base import PokeReturnValue
 class TestSFTPSensor:
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
     def test_file_present(self, sftp_hook_mock):
-        sftp_hook_mock.return_value.get_mod_time.return_value = 
"19700101000000"
+        sftp_hook_mock.return_value.isfile.return_value = True
         sftp_sensor = SFTPSensor(task_id="unit_test", 
path="/path/to/file/1970-01-01.txt")
         context = {"ds": "1970-01-01"}
         output = sftp_sensor.poke(context)
-        
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
+        
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
         sftp_hook_mock.return_value.close_conn.assert_not_called()
         assert output
 
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
     def test_file_absent(self, sftp_hook_mock):
-        sftp_hook_mock.return_value.get_mod_time.side_effect = 
OSError(SFTP_NO_SUCH_FILE, "File missing")
+        sftp_hook_mock.return_value.isfile.side_effect = 
OSError(SFTP_NO_SUCH_FILE, "File missing")
         sftp_sensor = SFTPSensor(task_id="unit_test", 
path="/path/to/file/1970-01-01.txt")
         context = {"ds": "1970-01-01"}
         output = sftp_sensor.poke(context)
-        
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
+        
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
         sftp_hook_mock.return_value.close_conn.assert_not_called()
         assert not output
 
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
     def test_sftp_failure(self, sftp_hook_mock):
-        sftp_hook_mock.return_value.get_mod_time.side_effect = 
OSError(SFTP_FAILURE, "SFTP failure")
+        sftp_hook_mock.return_value.isfile.side_effect = OSError(SFTP_FAILURE, 
"SFTP failure")
 
         sftp_sensor = SFTPSensor(task_id="unit_test", 
path="/path/to/file/1970-01-01.txt")
         context = {"ds": "1970-01-01"}
@@ -69,6 +69,7 @@ class TestSFTPSensor:
 
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
     def test_file_new_enough(self, sftp_hook_mock):
+        sftp_hook_mock.return_value.isfile.return_value = True
         sftp_hook_mock.return_value.get_mod_time.return_value = 
"19700101000000"
         tz = timezone("America/Toronto")
         sftp_sensor = SFTPSensor(
@@ -78,12 +79,14 @@ class TestSFTPSensor:
         )
         context = {"ds": "1970-01-00"}
         output = sftp_sensor.poke(context)
+        
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
         
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
         sftp_hook_mock.return_value.close_conn.assert_not_called()
         assert output
 
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
     def test_file_not_new_enough(self, sftp_hook_mock):
+        sftp_hook_mock.return_value.isfile.return_value = True
         sftp_hook_mock.return_value.get_mod_time.return_value = 
"19700101000000"
         tz = timezone("Europe/Paris")
         sftp_sensor = SFTPSensor(
@@ -93,6 +96,7 @@ class TestSFTPSensor:
         )
         context = {"ds": "1970-01-00"}
         output = sftp_sensor.poke(context)
+        
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
         
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
         sftp_hook_mock.return_value.close_conn.assert_not_called()
         assert not output
@@ -113,30 +117,29 @@ class TestSFTPSensor:
     )
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
     def test_multiple_datetime_format_in_newer_than(self, sftp_hook_mock, 
newer_than):
+        sftp_hook_mock.return_value.isfile.return_value = True
         sftp_hook_mock.return_value.get_mod_time.return_value = 
"19700101000000"
         sftp_sensor = SFTPSensor(
             task_id="unit_test", path="/path/to/file/1970-01-01.txt", 
newer_than=newer_than
         )
         context = {"ds": "1970-01-00"}
         output = sftp_sensor.poke(context)
+        
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
         
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
         sftp_hook_mock.return_value.close_conn.assert_not_called()
         assert not output
 
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
     def test_file_present_with_pattern(self, sftp_hook_mock):
-        sftp_hook_mock.return_value.get_mod_time.return_value = 
"19700101000000"
         sftp_hook_mock.return_value.get_files_by_pattern.return_value = 
["text_file.txt"]
         sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/", 
file_pattern="*.txt")
         context = {"ds": "1970-01-01"}
         output = sftp_sensor.poke(context)
-        
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/text_file.txt")
         sftp_hook_mock.return_value.close_conn.assert_not_called()
         assert output
 
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
     def test_file_not_present_with_pattern(self, sftp_hook_mock):
-        sftp_hook_mock.return_value.get_mod_time.return_value = 
"19700101000000"
         sftp_hook_mock.return_value.get_files_by_pattern.return_value = []
         sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/", 
file_pattern="*.txt")
         context = {"ds": "1970-01-01"}
@@ -146,7 +149,6 @@ class TestSFTPSensor:
 
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
     def test_multiple_files_present_with_pattern(self, sftp_hook_mock):
-        sftp_hook_mock.return_value.get_mod_time.return_value = 
"19700101000000"
         sftp_hook_mock.return_value.get_files_by_pattern.return_value = [
             "text_file.txt",
             "another_text_file.txt",
@@ -154,10 +156,7 @@ class TestSFTPSensor:
         sftp_sensor = SFTPSensor(task_id="unit_test", path="/path/to/file/", 
file_pattern="*.txt")
         context = {"ds": "1970-01-01"}
         output = sftp_sensor.poke(context)
-        get_mod_time = sftp_hook_mock.return_value.get_mod_time
-        expected_calls = [call("/path/to/file/text_file.txt"), 
call("/path/to/file/another_text_file.txt")]
         sftp_hook_mock.return_value.close_conn.assert_not_called()
-        assert get_mod_time.mock_calls == expected_calls
         assert output
 
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
@@ -227,7 +226,7 @@ class TestSFTPSensor:
     )
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
     def test_file_path_present_with_callback(self, sftp_hook_mock, op_args, 
op_kwargs):
-        sftp_hook_mock.return_value.get_mod_time.return_value = 
"19700101000000"
+        sftp_hook_mock.return_value.isfile.return_value = True
         sample_callable = Mock()
         sample_callable.return_value = ["sample_return"]
         sftp_sensor = SFTPSensor(
@@ -240,7 +239,7 @@ class TestSFTPSensor:
         context = {"ds": "1970-01-01"}
         output = sftp_sensor.poke(context)
 
-        
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
+        
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
         sftp_hook_mock.return_value.close_conn.assert_not_called()
         sample_callable.assert_called_once_with(*op_args, **op_kwargs)
         assert isinstance(output, PokeReturnValue)
@@ -259,7 +258,6 @@ class TestSFTPSensor:
     )
     @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
     def test_file_pattern_present_with_callback(self, sftp_hook_mock, op_args, 
op_kwargs):
-        sftp_hook_mock.return_value.get_mod_time.return_value = 
"19700101000000"
         sample_callable = Mock()
         sample_callable.return_value = ["sample_return"]
         sftp_hook_mock.return_value.get_files_by_pattern.return_value = [
@@ -285,3 +283,27 @@ class TestSFTPSensor:
             "files_found": ["/path/to/file/text_file.txt", 
"/path/to/file/another_text_file.txt"],
             "decorator_return_value": ["sample_return"],
         }
+
+    @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
+    def test_mod_time_called_when_newer_than_set(self, sftp_hook_mock):
+        sftp_hook_mock.return_value.isfile.return_value = True
+        sftp_hook_mock.return_value.get_mod_time.return_value = 
"19700101000000"
+        tz = timezone("America/Toronto")
+        sftp_sensor = SFTPSensor(
+            task_id="unit_test",
+            path="/path/to/file/1970-01-01.txt",
+            newer_than=tz.convert(datetime(1960, 1, 2)),
+        )
+        context = {"ds": "1970-01-01"}
+        sftp_sensor.poke(context)
+        
sftp_hook_mock.return_value.isfile.assert_called_once_with("/path/to/file/1970-01-01.txt")
+        
sftp_hook_mock.return_value.get_mod_time.assert_called_once_with("/path/to/file/1970-01-01.txt")
+
+    @patch("airflow.providers.sftp.sensors.sftp.SFTPHook")
+    def test_mod_time_not_called_when_newer_than_not_set(self, sftp_hook_mock):
+        sftp_hook_mock.return_value.isfile.return_value = True
+        sftp_hook_mock.return_value.get_mod_time.return_value = 
"19700101000000"
+        sftp_sensor = SFTPSensor(task_id="unit_test", 
path="/path/to/file/1970-01-01.txt")
+        context = {"ds": "1970-01-01"}
+        sftp_sensor.poke(context)
+        sftp_hook_mock.return_value.get_mod_time.assert_not_called()

Reply via email to