ephraimbuddy commented on pull request #16823:
URL: https://github.com/apache/airflow/pull/16823#issuecomment-896697397
@ashb, I have just discovered that XCom key can actually be changed with the
TaskFlow API currently:
```python
from airflow import DAG
from airflow.utils.dates import days_ago
value_1 = [1, 2, 3]
value_2 = {'a': 'b'}
dag = DAG(
'example_xcom',
schedule_interval="@once",
start_date=days_ago(2),
tags=['example'],
)
@dag.task()
def push(**kwargs):
"""Pushes an XCom without a specific target"""
kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
@dag.task()
def push_by_returning():
"""Pushes an XCom without a specific target, just by returning it"""
return value_2
@dag.task()
def puller(data1, data2, **kwargs):
"""Pull all previously pushed XComs and check if the pushed values match
the pulled values."""
ti = kwargs['ti']
# get value_1
pulled_value_1 = ti.xcom_pull(key=None, task_ids='push')
if pulled_value_1 != value_1:
raise ValueError(f'The two values differ {pulled_value_1} and
{value_1}')
# get value_2
pulled_value_2 = data2
if pulled_value_2 != value_2:
raise ValueError(f'The two values differ {pulled_value_2} and
{value_2}')
# get both value_1 and value_2
pulled_value_1, pulled_value_2 = ti.xcom_pull(key=None,
task_ids=['push', 'push_by_returning'])
if pulled_value_1 != value_1:
raise ValueError(f'The two values differ {pulled_value_1} and
{value_1}')
if pulled_value_2 != value_2:
raise ValueError(f'The two values differ {pulled_value_2} and
{value_2}')
puller(push(), push_by_returning())
```
The decorated function accepts the `**kwargs` and we can use `ti.xcom_push`
and also `ti.xcom_pull`
Closing this PR!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]