This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c627311863 feat(aws): provide the context to check_fn in S3 sensor 
(#40686)
c627311863 is described below

commit c62731186385925da82aecb728b61a7b194eafaa
Author: Hussein Awala <[email protected]>
AuthorDate: Wed Jul 10 17:14:37 2024 +0200

    feat(aws): provide the context to check_fn in S3 sensor (#40686)
---
 airflow/providers/amazon/aws/sensors/s3.py      | 16 +++++++++++-----
 tests/system/providers/amazon/aws/example_s3.py |  2 +-
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/amazon/aws/sensors/s3.py 
b/airflow/providers/amazon/aws/sensors/s3.py
index adcdcbf010..9c524494cd 100644
--- a/airflow/providers/amazon/aws/sensors/s3.py
+++ b/airflow/providers/amazon/aws/sensors/s3.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import fnmatch
+import inspect
 import os
 import re
 from datetime import datetime, timedelta
@@ -57,13 +58,13 @@ class S3KeySensor(BaseSensorOperator):
         refers to this bucket
     :param wildcard_match: whether the bucket_key should be interpreted as a
         Unix wildcard pattern
-    :param check_fn: Function that receives the list of the S3 objects,
+    :param check_fn: Function that receives the list of the S3 objects with 
the context values,
         and returns a boolean:
         - ``True``: the criteria is met
         - ``False``: the criteria isn't met
         **Example**: Wait for any S3 object size more than 1 megabyte  ::
 
-            def check_fn(files: List) -> bool:
+            def check_fn(files: List, **kwargs) -> bool:
                 return any(f.get('Size', 0) > 1048576 for f in files)
     :param aws_conn_id: a reference to the s3 connection
     :param verify: Whether to verify SSL certificates for S3 connection.
@@ -112,7 +113,7 @@ class S3KeySensor(BaseSensorOperator):
         self.use_regex = use_regex
         self.metadata_keys = metadata_keys if metadata_keys else ["Size"]
 
-    def _check_key(self, key):
+    def _check_key(self, key, context: Context):
         bucket_name, key = S3Hook.get_s3_bucket_key(self.bucket_name, key, 
"bucket_name", "bucket_key")
         self.log.info("Poking for key : s3://%s/%s", bucket_name, key)
 
@@ -167,15 +168,20 @@ class S3KeySensor(BaseSensorOperator):
             files = [metadata]
 
         if self.check_fn is not None:
+            # For backwards compatibility, check if the function takes a 
context argument
+            signature = inspect.signature(self.check_fn)
+            if any(param.kind == inspect.Parameter.VAR_KEYWORD for param in 
signature.parameters.values()):
+                return self.check_fn(files, **context)
+            # Otherwise, just pass the files
             return self.check_fn(files)
 
         return True
 
     def poke(self, context: Context):
         if isinstance(self.bucket_key, str):
-            return self._check_key(self.bucket_key)
+            return self._check_key(self.bucket_key, context=context)
         else:
-            return all(self._check_key(key) for key in self.bucket_key)
+            return all(self._check_key(key, context=context) for key in 
self.bucket_key)
 
     def execute(self, context: Context) -> None:
         """Airflow runs this method on the worker and defers using the 
trigger."""
diff --git a/tests/system/providers/amazon/aws/example_s3.py 
b/tests/system/providers/amazon/aws/example_s3.py
index 3b4a4bd38b..06f60b4ac9 100644
--- a/tests/system/providers/amazon/aws/example_s3.py
+++ b/tests/system/providers/amazon/aws/example_s3.py
@@ -74,7 +74,7 @@ with DAG(
     key_regex_pattern = ".*-key"
 
     # [START howto_sensor_s3_key_function_definition]
-    def check_fn(files: list) -> bool:
+    def check_fn(files: list, **kwargs) -> bool:
         """
         Example of custom check: check if all files are bigger than ``20 
bytes``
 

Reply via email to