SoftDed opened a new issue, #36837:
URL: https://github.com/apache/airflow/issues/36837

   ### Apache Airflow version
   
   2.8.0
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When using two or more DataSets as triggers, an error occurs while accessing 
the source_task_instance object of DatasetEvent.
   
   ### What you think should happen instead?
   
   It should be possible to access all fields of every DatasetEvent.
   
   ### How to reproduce
   
   Create two test DAGs (producer and consumer) and link them with two DataSets.
   Code:
   ```python
   from __future__ import annotations
   
   from datetime import datetime
   
   from airflow.datasets import Dataset
   from airflow.decorators import task, dag
   
   testDataSet = Dataset('/test')
   testDataSet2 = Dataset('/test2')
   
   
   @dag(
       dag_id='dag_producer',
       schedule=None,
       start_date=datetime(2021, 1, 1),
       catchup=False,
   )
   def dag_producer():
       @task(outlets=[testDataSet, testDataSet2])
       def test():
           return 'OK'
   
       test()
   
   
   @dag(
       dag_id="dag_consumer",
       schedule=[testDataSet, testDataSet2],
       start_date=datetime(2021, 1, 1),
       catchup=False,
   
   )
   def dag_consumer():
       @task
       def print_triggering_dataset_events(triggering_dataset_events=None):
           for dataset, dataset_list in triggering_dataset_events.items():
               for dataset_event in dataset_list:
                   print('Task result: ', 
dataset_event.source_task_instance.xcom_pull(task_ids='test'))
   
       print_triggering_dataset_events()
   
   
   dag_consumer()
   dag_producer()
   ```
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   airflow@4e6c155ede93:/opt/airflow$ pip freeze | grep provider
   apache-airflow-providers-amazon==8.13.0
   apache-airflow-providers-celery==3.5.0
   apache-airflow-providers-cncf-kubernetes==7.11.0
   apache-airflow-providers-common-io==1.1.0
   apache-airflow-providers-common-sql==1.9.0
   apache-airflow-providers-docker==3.8.2
   apache-airflow-providers-elasticsearch==5.3.0
   apache-airflow-providers-ftp==3.7.0
   apache-airflow-providers-google==10.12.0
   apache-airflow-providers-grpc==3.4.0
   apache-airflow-providers-hashicorp==3.6.0
   apache-airflow-providers-http==4.8.0
   apache-airflow-providers-imap==3.5.0
   apache-airflow-providers-microsoft-azure==8.4.0
   apache-airflow-providers-mysql==5.5.0
   apache-airflow-providers-odbc==4.2.0
   apache-airflow-providers-openlineage==1.3.0
   apache-airflow-providers-postgres==5.9.0
   apache-airflow-providers-redis==3.5.0
   apache-airflow-providers-sendgrid==3.4.0
   apache-airflow-providers-sftp==4.8.0
   apache-airflow-providers-slack==8.5.0
   apache-airflow-providers-snowflake==5.2.0
   apache-airflow-providers-sqlite==3.6.0
   apache-airflow-providers-ssh==3.9.0
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   # Error in log:
   ```
   4e6c155ede93
   *** Found local files:
   ***   * 
/opt/airflow/logs/dag_id=dag_consumer/run_id=dataset_triggered__2024-01-17T09:50:59.904054+00:004e6c155ede93
   *** Found local files:
   ***   * 
/opt/airflow/logs/dag_id=dag_consumer/run_id=dataset_triggered__2024-01-17T10:15:09.140192+00:00/task_id=print_triggering_dataset_events/attempt=1.log
   [2024-01-17, 18:15:10 +08] {taskinstance.py:1957} INFO - Dependencies all 
met for dep_context=non-requeueable deps ti=<TaskInstance: 
dag_consumer.print_triggering_dataset_events 
dataset_triggered__2024-01-17T10:15:09.140192+00:00 [queued]>
   [2024-01-17, 18:15:10 +08] {taskinstance.py:1957} INFO - Dependencies all 
met for dep_context=requeueable deps ti=<TaskInstance: 
dag_consumer.print_triggering_dataset_events 
dataset_triggered__2024-01-17T10:15:09.140192+00:00 [queued]>
   [2024-01-17, 18:15:10 +08] {taskinstance.py:2171} INFO - Starting attempt 1 
of 1
   [2024-01-17, 18:15:10 +08] {taskinstance.py:2192} INFO - Executing 
<Task(_PythonDecoratedOperator): print_triggering_dataset_events> on 2024-01-17 
10:15:09.140192+00:00
   [2024-01-17, 18:15:10 +08] {standard_task_runner.py:60} INFO - Started 
process 2656 to run task
   [2024-01-17, 18:15:10 +08] {standard_task_runner.py:87} INFO - Running: 
['***', 'tasks', 'run', 'dag_consumer', 'print_triggering_dataset_events', 
'dataset_triggered__2024-01-17T10:15:09.140192+00:00', '--job-id', '1641', 
'--raw', '--subdir', 'DAGS_FOLDER/testdag.py', '--cfg-path', '/tmp/tmpeer76u97']
   [2024-01-17, 18:15:10 +08] {standard_task_runner.py:88} INFO - Job 1641: 
Subtask print_triggering_dataset_events
   [2024-01-17, 18:15:10 +08] {warnings.py:109} WARNING - 
/home/***/.local/lib/python3.8/site-packages/***/settings.py:195: 
DeprecationWarning: The sql_alchemy_conn option in [core] has been moved to the 
sql_alchemy_conn option in [database] - the old setting has been used, but 
please update your config.
     SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
   [2024-01-17, 18:15:10 +08] {task_command.py:423} INFO - Running 
<TaskInstance: dag_consumer.print_triggering_dataset_events 
dataset_triggered__2024-01-17T10:15:09.140192+00:00 [running]> on host 
4e6c155ede93
   [2024-01-17, 18:15:10 +08] {taskinstance.py:2481} INFO - Exporting env vars: 
AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='dag_consumer' 
AIRFLOW_CTX_TASK_ID='print_triggering_dataset_events' 
AIRFLOW_CTX_EXECUTION_DATE='2024-01-17T10:15:09.140192+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='dataset_triggered__2024-01-17T10:15:09.140192+00:00'
   [2024-01-17, 18:15:10 +08] {logging_mixin.py:188} INFO - Task result:  OK
   [2024-01-17, 18:15:10 +08] {taskinstance.py:2699} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
 line 433, in _execute_task
       result = execute_callable(context=context, **execute_callable_kwargs)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/decorators/base.py", 
line 242, in execute
       return_value = super().execute(context)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", 
line 199, in execute
       return_value = self.execute_callable()
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", 
line 216, in execute_callable
       return self.python_callable(*self.op_args, **self.op_kwargs)
     File "/opt/airflow/dags/testdag.py", line 38, in 
print_triggering_dataset_events
       print('Task result: ', 
dataset_event.source_task_instance.xcom_pull(task_ids='test'))
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py",
 line 487, in __get__
       return self.impl.get(state, dict_)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py",
 line 959, in get
       value = self._fire_loader_callables(state, key, passive)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/attributes.py",
 line 995, in _fire_loader_callables
       return self.callable_(state, passive)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/strategies.py",
 line 863, in _load_for_state
       raise orm_exc.DetachedInstanceError(
   sqlalchemy.orm.exc.DetachedInstanceError: Parent instance <DatasetEvent at 
0x7f66e6521eb0> is not bound to a Session; lazy load operation of attribute 
'source_task_instance' cannot proceed (Background on this error at: 
https://sqlalche.me/e/14/bhk3)
   [2024-01-17, 18:15:10 +08] {taskinstance.py:1138} INFO - Marking task as 
FAILED. dag_id=dag_consumer, task_id=print_triggering_dataset_events, 
execution_date=20240117T101509, start_date=20240117T101510, 
end_date=20240117T101510
   [2024-01-17, 18:15:10 +08] {standard_task_runner.py:107} ERROR - Failed to 
execute job 1641 for task print_triggering_dataset_events (Parent instance 
<DatasetEvent at 0x7f66e6521eb0> is not bound to a Session; lazy load operation 
of attribute 'source_task_instance' cannot proceed (Background on this error 
at: https://sqlalche.me/e/14/bhk3); 2656)
   [2024-01-17, 18:15:10 +08] {local_task_job_runner.py:234} INFO - Task exited 
with return code 1
   [2024-01-17, 18:15:10 +08] {taskinstance.py:3281} INFO - 0 downstream tasks 
scheduled from follow-on schedule check
   ```
   
   # In the UI
   ## With two DataSet - error
   
![2024-01-17_17-50-52](https://github.com/apache/airflow/assets/351289/b9c46eb8-0d69-446e-bc52-b02213bc4683)
   
![2024-01-17_17-51-09](https://github.com/apache/airflow/assets/351289/b18ab8f2-c73e-4ff8-97b8-9855c3391043)
   
![2024-01-17_17-51-27](https://github.com/apache/airflow/assets/351289/ba8c0144-6bed-44c4-84e6-f567221005df0)
   
   ## With one DataSet - OK
   
![2024-01-17_17-55-05](https://github.com/apache/airflow/assets/351289/86d22a63-9694-4149-862c-610193074354)
   
![2024-01-17_17-54-56](https://github.com/apache/airflow/assets/351289/b330cbc0-134e-4337-b6be-38d919c199d1)
   
![2024-01-17_17-54-37](https://github.com/apache/airflow/assets/351289/663d3b2d-5cf2-4eea-8aff-bae3c5cb5472)
   
   
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to