seanghaeli commented on code in PR #66608:
URL: https://github.com/apache/airflow/pull/66608#discussion_r3314228558
##########
airflow-core/src/airflow/jobs/triggerer_job_runner.py:
##########
@@ -1184,6 +1222,42 @@ def create_runtime_ti(
),
)
+ @staticmethod
+ def _build_context_from_dag_run_data(dag_run_data: dict) -> Context:
+ """
+ Build a standard Context dict from serialized dag_run_data for
callback triggers.
+
+ This provides the same DagRun-level context fields (dag_run, run_id,
logical_date,
+ ds, ts, etc.) that task-bound triggers get via
RuntimeTaskInstance.get_template_context(),
+ but without task-specific fields since callbacks are not tied to a
task.
+ """
+ from airflow._shared.timezones.timezone import coerce_datetime
+ from airflow.api_fastapi.execution_api.datamodels.taskinstance import
DagRun as DRDataModel
+
+ dag_run = DRDataModel(**dag_run_data)
+ context: Context = {"dag_run": dag_run} # type: ignore[typeddict-item]
+
+ if logical_date := coerce_datetime(dag_run.logical_date):
+ ds = logical_date.strftime("%Y-%m-%d")
+ ts = logical_date.isoformat()
+ context.update(
+ {
+ "logical_date": logical_date,
+ "run_id": dag_run.run_id,
+ "ds": ds,
+ "ds_nodash": ds.replace("-", ""),
+ "ts": ts,
+ "ts_nodash": logical_date.strftime("%Y%m%dT%H%M%S"),
+ "ts_nodash_with_tz": ts.replace("-", "").replace(":", ""),
+ "data_interval_start":
coerce_datetime(dag_run.data_interval_start),
+ "data_interval_end":
coerce_datetime(dag_run.data_interval_end),
Review Comment:
Good idea, implemented shared helper
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]