o-nikolas commented on issue #27785:
URL: https://github.com/apache/airflow/issues/27785#issuecomment-1320787388
#### tl;dr: this actually is working, see my simplified example below. BUT
there is an edge case where this _doesn't_ work for manual triggers of the
parent dag.
If you load the attached dag below into your airflow environment (simplified
from what you provided) it will run every 5 minutes and the XCOM value from the
task in the subdag will be correctly read back from the task in the parent dag.
This works because `xcom_pull` now adds `run_id` to the XCOM query filter (see
this PR: #19825). And when the parent dag is _scheduled_ both the parent and
subdag have the same dag run id (something like `scheduled__<datetime>`).
BUT, if you _manually_ trigger the parent dag its run id is now
`manual__<datetime>` while the subdag is still `scheduled__<datetime>` which
means the filtering on `run_id` excludes all xcom values from the child dag
since it has a different dag run id. This is a legitimate regression that I
think we should fix (@potiuk do you agree?).
I think there may have been some bugs with your dag code, but even if there
wasn't, I assume you were manually triggering the parent dag, so the xcom could
not be read successfully for the above reason.
```python
from __future__ import annotations
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.subdag import SubDagOperator
DAG_NAME = "test_xcom"
SUB_DAG_NAME = "subdag_writer"
SUB_DAG_PUSH_TASK = "push_xcom_value_sub_dag"
def sub_dag() -> DAG:
with DAG(
dag_id=f"{DAG_NAME}.{SUB_DAG_NAME}",
start_date=datetime(2022, 1, 1),
catchup=False,
schedule="* * * * *",
) as subdag:
def extract(**kwargs):
print("hello Im running")
return "xcom_value_test"
PythonOperator(
task_id=SUB_DAG_PUSH_TASK,
python_callable=extract,
dag=subdag,
do_xcom_push=True,
)
return subdag
with DAG(
dag_id=DAG_NAME,
start_date=datetime(2022, 1, 1),
catchup=False,
schedule="@once",
# schedule="*/5 * * * *",
) as dag:
def push_xcom(**kwargs):
return "test_value"
def read_xcom(ti, **kwargs):
return_value = ti.xcom_pull(
task_ids="push_xcom_value_sub_dag",
dag_id="test_xcom.subdag_writer", key="return_value",
)
print(f"I got this from xcom: {return_value}")
push_xcom_value = PythonOperator(
task_id="push_xcom_value",
python_callable=push_xcom,
)
subdag = sub_dag()
bash_push_sub_dag = SubDagOperator(
task_id=SUB_DAG_NAME,
subdag=subdag,
dag=dag,
)
read_xcom_value = PythonOperator(
task_id="read_xcom_value",
python_callable=read_xcom,
)
push_xcom_value >> bash_push_sub_dag >> read_xcom_value
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]