potiuk commented on code in PR #45158:
URL: https://github.com/apache/airflow/pull/45158#discussion_r1895042920
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -397,6 +397,44 @@ In the above example, the trigger will end the task
instance directly if ``end_f
.. note::
Exiting from the trigger works only when listeners are not integrated for
the deferrable operator. Currently, when deferrable operator has the
``end_from_trigger`` attribute set to ``True`` and listeners are integrated it
raises an exception during parsing to indicate this limitation. While writing
the custom trigger, ensure that the trigger is not set to end the task instance
directly if the listeners are added from plugins. If the ``end_from_trigger``
attribute is changed to different attribute by author of trigger, the DAG
parsing would not raise any exception and the listeners dependent on this task
would not work. This limitation will be addressed in future releases.
+Handling XComs for Deferred Tasks
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+When working with deferred tasks that exit directly from triggers, you may
need to push XCom values for subsequent tasks in the pipeline. The method
``_push_xcoms_if_necessary`` is responsible for pushing these values. Below is
an example of how this can be implemented:
+
+.. code-block:: python
+
+ def _push_xcoms_if_necessary(self, *, task_instance: TaskInstance) -> None:
+ """
+ Push XComs if required based on the task's state and the provided
events.
+ """
+ if task_instance.state == TaskInstanceState.SUCCESS:
+ task_instance.xcom_push(
+ key="result", value={"status": "success", "message": "Task
completed successfully"}
+ )
+ elif task_instance.state == TaskInstanceState.FAILED:
+ task_instance.xcom_push(key="result", value={"status": "failure",
"message": "Task failed"})
+
+You can call this method within your trigger to manage XComs when the task
instance ends directly from the trigger. Here's an example:
+
+.. code-block:: python
+
+ class WaitFiveHourTrigger(BaseTrigger):
+ def __init__(self, duration: timedelta, *, end_from_trigger: bool =
False):
+ super().__init__()
+ self.duration = duration
+ self.end_from_trigger = end_from_trigger
+
+ async def run(self) -> AsyncIterator[TriggerEvent]:
+ await asyncio.sleep(self.duration.total_seconds())
+ if self.end_from_trigger:
+ task_instance = ... # Get the relevant TaskInstance
Review Comment:
Hmm. I am not sure how to get the task instance here . I am not sure if you
can do it automatically ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]