This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c627311863 feat(aws): provide the context to check_fn in S3 sensor
(#40686)
c627311863 is described below
commit c62731186385925da82aecb728b61a7b194eafaa
Author: Hussein Awala <[email protected]>
AuthorDate: Wed Jul 10 17:14:37 2024 +0200
feat(aws): provide the context to check_fn in S3 sensor (#40686)
---
airflow/providers/amazon/aws/sensors/s3.py | 16 +++++++++++-----
tests/system/providers/amazon/aws/example_s3.py | 2 +-
2 files changed, 12 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/amazon/aws/sensors/s3.py
b/airflow/providers/amazon/aws/sensors/s3.py
index adcdcbf010..9c524494cd 100644
--- a/airflow/providers/amazon/aws/sensors/s3.py
+++ b/airflow/providers/amazon/aws/sensors/s3.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import fnmatch
+import inspect
import os
import re
from datetime import datetime, timedelta
@@ -57,13 +58,13 @@ class S3KeySensor(BaseSensorOperator):
refers to this bucket
:param wildcard_match: whether the bucket_key should be interpreted as a
Unix wildcard pattern
- :param check_fn: Function that receives the list of the S3 objects,
+ :param check_fn: Function that receives the list of the S3 objects with
the context values,
and returns a boolean:
- ``True``: the criteria is met
- ``False``: the criteria isn't met
**Example**: Wait for any S3 object size more than 1 megabyte ::
- def check_fn(files: List) -> bool:
+ def check_fn(files: List, **kwargs) -> bool:
return any(f.get('Size', 0) > 1048576 for f in files)
:param aws_conn_id: a reference to the s3 connection
:param verify: Whether to verify SSL certificates for S3 connection.
@@ -112,7 +113,7 @@ class S3KeySensor(BaseSensorOperator):
self.use_regex = use_regex
self.metadata_keys = metadata_keys if metadata_keys else ["Size"]
- def _check_key(self, key):
+ def _check_key(self, key, context: Context):
bucket_name, key = S3Hook.get_s3_bucket_key(self.bucket_name, key,
"bucket_name", "bucket_key")
self.log.info("Poking for key : s3://%s/%s", bucket_name, key)
@@ -167,15 +168,20 @@ class S3KeySensor(BaseSensorOperator):
files = [metadata]
if self.check_fn is not None:
+ # For backwards compatibility, check if the function takes a
context argument
+ signature = inspect.signature(self.check_fn)
+ if any(param.kind == inspect.Parameter.VAR_KEYWORD for param in
signature.parameters.values()):
+ return self.check_fn(files, **context)
+ # Otherwise, just pass the files
return self.check_fn(files)
return True
def poke(self, context: Context):
if isinstance(self.bucket_key, str):
- return self._check_key(self.bucket_key)
+ return self._check_key(self.bucket_key, context=context)
else:
- return all(self._check_key(key) for key in self.bucket_key)
+ return all(self._check_key(key, context=context) for key in
self.bucket_key)
def execute(self, context: Context) -> None:
"""Airflow runs this method on the worker and defers using the
trigger."""
diff --git a/tests/system/providers/amazon/aws/example_s3.py
b/tests/system/providers/amazon/aws/example_s3.py
index 3b4a4bd38b..06f60b4ac9 100644
--- a/tests/system/providers/amazon/aws/example_s3.py
+++ b/tests/system/providers/amazon/aws/example_s3.py
@@ -74,7 +74,7 @@ with DAG(
key_regex_pattern = ".*-key"
# [START howto_sensor_s3_key_function_definition]
- def check_fn(files: list) -> bool:
+ def check_fn(files: list, **kwargs) -> bool:
"""
Example of custom check: check if all files are bigger than ``20
bytes``