shubhamraj-git opened a new pull request, #46042:
URL: https://github.com/apache/airflow/pull/46042

   related: #45966 
   
   The XCom creation API fills a critical gap by enabling dynamic, external 
updates to workflows. This PR solves that problem.
   
   ### Benefits:
   
   1. Enable Long-Running Workflow Updates:
   
   > Current XComs are tied to task execution, limiting flexibility for 
long-running workflows.
   > The API allows dynamic creation or updates of XCom values during 
execution, enabling workflows to adapt to external events or data without 
waiting for task completion.
   
   2. Improve Workflow Interactivity:
   
   > External systems or operators can inject real-time decisions or 
intermediate results directly into workflows.
   > Useful for human-in-the-loop workflows, event-driven processes, and 
external service integrations.
   
   3. Reduce Task Dependency Overhead:
   
   > Avoid placeholder tasks just to generate or update XComs.
   > Streamline DAGs by enabling direct updates without unnecessary task 
execution.
   > 
   
   
-------------------------------------------------------------------------------------------------------------------
   
   ### Steps to play around the feature.
   
   1. Add the following Dag.
   
   _This Airflow DAG demonstrates how to use XComs for passing data between 
tasks. The first task (wait_and_not_push) waits for 1 minute but does not push 
any XCom. The second task (pull_and_print) attempts to pull an XCom value with 
the key outbound_key1 from the first task, logs it if found, or warns if 
absent._
   
   ```
   from airflow import DAG
   from airflow.providers.standard.operators.python import PythonOperator
   import time
   import logging
   import pendulum
   
   # Default arguments for the DAG
   default_args = {
       'owner': 'airflow',
       'retries': 1
   }
   
   # Initialize the DAG
   with DAG(
       dag_id="xcom_pull_example_dag",
       default_args=default_args,
       description="A DAG demonstrating XCom pull with key 'outbound_key1'",
       start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
       catchup=False,
       tags=["xcom", "example"],
   ) as dag:
   
       def wait_and_not_push_xcom(**kwargs):
           """
           Task to wait for 1 minute.
           """
           logging.info("Waiting for 1 minute...")
           time.sleep(60)  # Wait for 1 minute
   
       def pull_and_print_xcom(**kwargs):
           """
           Task to pull an XCom entry with key 'outbound_key1' and print its 
value.
           """
           ti = kwargs["ti"]
           xcom_value = ti.xcom_pull(task_ids="wait_and_not_push", 
key="outbound_key1")
   
           if xcom_value:
               logging.info(f"Retrieved XCom value for key 'outbound_key1': 
{xcom_value}")
           else:
               logging.warning("No XCom value found for key 'outbound_key1'!")
   
       # First task: Wait for 3 minutes and push an XCom entry
       wait_and_not_push = PythonOperator(
           task_id="wait_and_not_push",
           python_callable=wait_and_not_push_xcom
       )
   
       # Second task: Pull the XCom entry and print its value
       pull_and_print = PythonOperator(
           task_id="pull_and_print",
           python_callable=pull_and_print_xcom
       )
   
       # Define the task dependencies
       wait_and_not_push >> pull_and_print
   
   ```
   
   3. Trigger the run.
   4. As soon as the run starts, trigger an XCom creation through API (Can do 
it from swagger: http://localhost:29091/docs#/XCom/create_xcom_entry)
   
   <img width="746" alt="image" 
src="https://github.com/user-attachments/assets/b7f38435-c0d8-4586-ba71-d3c61b6ad584";
 />
   
   Now, check the logs, You can see the **XCom pull was successful**. 
   
   
   <!-- Please keep an empty line above the dashes. -->
   ---
   **^ Add meaningful description above**
   Read the **[Pull Request 
Guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#pull-request-guidelines)**
 for more information.
   In case of fundamental code changes, an Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvement+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in a 
newsfragment file, named `{pr_number}.significant.rst` or 
`{issue_number}.significant.rst`, in 
[newsfragments](https://github.com/apache/airflow/tree/main/newsfragments).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to