carlinix opened a new pull request, #56910:
URL: https://github.com/apache/airflow/pull/56910

   This PR fixes two inconsistencies between deferrable and non-deferrable 
execution modes of the `S3KeySensor`:
   
   1. Task context (`context`) is lost when resuming from the trigger.
   2. `metadata_keys` is ignored — the deferrable trigger (`S3KeyTrigger`) only 
returns object names (`list[str]`) instead of metadata dictionaries.
   
   Both behaviors cause user-defined `check_fn` functions to fail or behave 
inconsistently between execution modes.
   
   ### Reproduction
   
   Use the following DAG:
   
   ```python
   from __future__ import annotations
   from datetime import datetime, UTC
   from typing import Any
   from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
   from airflow.sdk.execution_time.context import get_current_context
   
   def ensure_datetime(value, fallback):
       if not value:
           return fallback
       if isinstance(value, datetime):
           return value
       return datetime.fromisoformat(value)
   
   def check_new_objects(files: list[dict[str, Any]], **context: Any) -> bool:
       ti = context.get("ti") or get_current_context().get("ti")
       extractor = ti.xcom_pull(task_ids="start_task", key="extractor") or {}
   
       print(type(files), files)
       print(type(context), context)
       return True
   
   check_s3 = S3KeySensor(
       task_id="check_new_s3_objects",
       bucket_name="example-bucket",
       bucket_key="example-prefix/*",
       aws_conn_id="aws_default",
       wildcard_match=True,
       metadata_keys=["Key", "LastModified", "Size", "ETag"],
       poke_interval=30,
       timeout=300,
       deferrable=True,
       check_fn=check_new_objects,
   )
   ```
   
   Observed logs:
   
   ```
   <class 'list'> ['XYZ YTD Export.csv', 'XYZ_Weekly_Export.csv']
   <class 'dict'> {}
   ```
   
   * `files` is a list of strings, and `context` is empty.
   If `deferrable=False`, both contain full metadata and context as expected.
   
   ### Root Cause
   
   1. Context loss:
   `S3KeySensor.execute_complete()` currently calls:
   `found_keys = self.check_fn(event["files"])`
   
   Instead of passing `context` like `_check_key()` does:
   
   ```python
   if any(param.kind == inspect.Parameter.VAR_KEYWORD for param in 
signature.parameters.values()):
       return self.check_fn(files, **context)
   ```
   
   2. Metadata loss:
   
   `S3KeyTrigger` emits only:
   `TriggerEvent({"status": "running", "files": keys})` where `keys` is a 
`list[str]` from `get_files_async()`, ignoring `metadata_keys`.
   
   ### Fixes Introduced
   
   - Fix 1: Pass task `context` properly to `check_fn` in `execute_complete()` 
by replicating the signature inspection logic from `_check_key()`.
   
   - Fix 2: Add `metadata_keys` support to `S3KeyTrigger`, allowing metadata 
retrieval (list_objects_v2 or head_object) for deferrable mode.
   
   ### Expected Behavior
   Both modes (`deferrable=True` and `deferrable=False`) now behave 
consistently:
   
   * `check_fn` always receives files as a list of metadata dictionaries.
   * `context`  always includes task instance and DAG context values (ti, 
dag_run, etc.).
   
   `metadata_keys=["*"]` returns full `head_object()` data.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to