shubhamraj-git opened a new pull request, #46042:
URL: https://github.com/apache/airflow/pull/46042
related: #45966
The XCom creation API fills a critical gap by enabling dynamic, external
updates to workflows. This PR solves that problem.
### Benefits:
1. Enable Long-Running Workflow Updates:
> Current XComs are tied to task execution, limiting flexibility for
long-running workflows.
> The API allows dynamic creation or updates of XCom values during
execution, enabling workflows to adapt to external events or data without
waiting for task completion.
2. Improve Workflow Interactivity:
> External systems or operators can inject real-time decisions or
intermediate results directly into workflows.
> Useful for human-in-the-loop workflows, event-driven processes, and
external service integrations.
3. Reduce Task Dependency Overhead:
> Avoid placeholder tasks just to generate or update XComs.
> Streamline DAGs by enabling direct updates without unnecessary task
execution.
>
-------------------------------------------------------------------------------------------------------------------
### Steps to play around the feature.
1. Add the following Dag.
_This Airflow DAG demonstrates how to use XComs for passing data between
tasks. The first task (wait_and_not_push) waits for 1 minute but does not push
any XCom. The second task (pull_and_print) attempts to pull an XCom value with
the key outbound_key1 from the first task, logs it if found, or warns if
absent._
```
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator
import time
import logging
import pendulum
# Default arguments for the DAG
default_args = {
'owner': 'airflow',
'retries': 1
}
# Initialize the DAG
with DAG(
dag_id="xcom_pull_example_dag",
default_args=default_args,
description="A DAG demonstrating XCom pull with key 'outbound_key1'",
start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
catchup=False,
tags=["xcom", "example"],
) as dag:
def wait_and_not_push_xcom(**kwargs):
"""
Task to wait for 1 minute.
"""
logging.info("Waiting for 1 minute...")
time.sleep(60) # Wait for 1 minute
def pull_and_print_xcom(**kwargs):
"""
Task to pull an XCom entry with key 'outbound_key1' and print its
value.
"""
ti = kwargs["ti"]
xcom_value = ti.xcom_pull(task_ids="wait_and_not_push",
key="outbound_key1")
if xcom_value:
logging.info(f"Retrieved XCom value for key 'outbound_key1':
{xcom_value}")
else:
logging.warning("No XCom value found for key 'outbound_key1'!")
# First task: Wait for 3 minutes and push an XCom entry
wait_and_not_push = PythonOperator(
task_id="wait_and_not_push",
python_callable=wait_and_not_push_xcom
)
# Second task: Pull the XCom entry and print its value
pull_and_print = PythonOperator(
task_id="pull_and_print",
python_callable=pull_and_print_xcom
)
# Define the task dependencies
wait_and_not_push >> pull_and_print
```
3. Trigger the run.
4. As soon as the run starts, trigger an XCom creation through API (Can do
it from swagger: http://localhost:29091/docs#/XCom/create_xcom_entry)
<img width="746" alt="image"
src="https://github.com/user-attachments/assets/b7f38435-c0d8-4586-ba71-d3c61b6ad584"
/>
Now, check the logs, You can see the **XCom pull was successful**.
<!-- Please keep an empty line above the dashes. -->
---
**^ Add meaningful description above**
Read the **[Pull Request
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
for more information.
In case of fundamental code changes, an Airflow Improvement Proposal
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
is needed.
In case of a new dependency, check compliance with the [ASF 3rd Party
License Policy](https://www.apache.org/legal/resolved.html#category-x).
In case of backwards incompatible changes please leave a note in a
newsfragment file, named `{pr_number}.significant.rst` or
`{issue_number}.significant.rst`, in
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]