benitezfede commented on code in PR #53654:
URL: https://github.com/apache/airflow/pull/53654#discussion_r2225736740


##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1356,6 +1356,7 @@ def handle_dag_callback(self, dag: SDKDAG, success: bool 
= True, reason: str = "
             "dag": dag,
             "run_id": str(self.run_id),
             "reason": reason,
+            "dag_run": self,

Review Comment:
   @kaxil thanks for reviewing and pointing where the changes should be.  If I 
make this change in the `execute_dag_callbacks` in the `processor.py`:
   
   ```
   def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: 
FilteringBoundLogger) -> None:
       ...
       
       callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
       
       dag_run = dag.get_dagrun(run_id=request.run_id) # NEW LINE
       
       context: Context = {
           "dag": dag,
           "run_id": request.run_id,
           "reason": request.msg,
           "dag_run": dag_run,
       }
       ...
    ```
    
    I get this error:
    
    ```
    --- Supervised process Last chance exception handler ---
   Traceback (most recent call last):
     File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", 
line 382, in _fork_main
       target()
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/processor.py", 
line 150, in _parse_file_entrypoint
       result = _parse_file(msg, log)
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/processor.py", 
line 166, in _parse_file
       _execute_callbacks(bag, msg.callback_requests, log)
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/processor.py", 
line 208, in _execute_callbacks
       _execute_dag_callbacks(dagbag, request, log)
     File "/opt/airflow/airflow-core/src/airflow/dag_processing/processor.py", 
line 221, in _execute_dag_callbacks
       dag_run = dag.get_dagrun(run_id=request.run_id)
     File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 99, in 
wrapper
       with create_session() as session:
     File "/usr/local/lib/python3.10/contextlib.py", line 135, in __enter__
       return next(self.gen)
     File "/opt/airflow/airflow-core/src/airflow/utils/session.py", line 40, in 
create_session
       session = Session()
     File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/supervisor.py", 
line 257, in __init__
       raise RuntimeError("Direct database access via the ORM is not allowed in 
Airflow 3.0")
   RuntimeError: Direct database access via the ORM is not allowed in Airflow 
3.0
   ```
   
   I also checked the example you posted and in the task callback it is doing 
something similar 
[here](https://github.com/apache/airflow/pull/53058/files#diff-564fd0a8fbe4cc47864a8043fcc1389b33120c88bb35852b26f45c36b902f70bR252)
 `task = dag.get_task(request.ti.task_id)` but the `dag` there is from 
[this](https://github.com/apache/airflow/blob/main/task-sdk/src/airflow/sdk/definitions/dag.py#L899)
 one from the sdk which I also do not see anyway to get the `dag_run` from.
   
   So how can I get the `dag_run` information and pass it to the callback 
context?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to