benitezfede commented on code in PR #53654:
URL: https://github.com/apache/airflow/pull/53654#discussion_r2225736740
##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1356,6 +1356,7 @@ def handle_dag_callback(self, dag: SDKDAG, success: bool
= True, reason: str = "
"dag": dag,
"run_id": str(self.run_id),
"reason": reason,
+ "dag_run": self,
Review Comment:
@kaxil thanks for reviewing and pointing where the changes should be. If I
make this change in the `execute_dag_callbacks` in the `processor.py`:
```
def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log:
FilteringBoundLogger) -> None:
...
callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
dag_run = dag.get_dagrun(run_id=request.run_id) # NEW LINE
context: Context = {
"dag": dag,
"run_id": request.run_id,
"reason": request.msg,
"dag_run": dag_run,
}
...
```
I get this error:
```
--- Supervised process Last chance exception handler ---
Traceback (most recent call last):
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py",
line 382, in _fork_main
target()
File "/opt/airflow/airflow-core/src/airflow/dag_processing/processor.py",
line 150, in _parse_file_entrypoint
result = _parse_file(msg, log)
File "/opt/airflow/airflow-core/src/airflow/dag_processing/processor.py",
line 166, in _parse_file
_execute_callbacks(bag, msg.callback_requests, log)
File "/opt/airflow/airflow-core/src/airflow/dag_processing/processor.py",
line 208, in _execute_callbacks
_execute_dag_callbacks(dagbag, request, log)
File "/opt/airflow/airflow-core/src/airflow/dag_processing/processor.py",
line 221, in _execute_dag_callbacks
dag_run = dag.get_dagrun(run_id=request.run_id)
File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 99, in
wrapper
with create_session() as session:
File "/usr/local/lib/python3.10/contextlib.py", line 135, in __enter__
return next(self.gen)
File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 40, in
create_session
session = Session()
File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py",
line 257, in __init__
raise RuntimeError("Direct database access via the ORM is not allowed in
Airflow 3.0")
RuntimeError: Direct database access via the ORM is not allowed in Airflow
3.0
```
So how can I get the `dag_run` information and pass it to the callback
context?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]