jedcunningham commented on code in PR #39912:
URL: https://github.com/apache/airflow/pull/39912#discussion_r1675314291


##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -213,6 +210,49 @@ This is particularly useful when deferring is the only 
thing the ``execute`` met
             # We have no more work to do here. Mark as complete.
             return
 
+To enable Dynamic Task Mapping support, you can define ``start_from_trigger`` 
and ``trigger_kwargs`` in the parameter of "__init__". Note that you don't need 
to define both of them to use this feature, but you do need to use the exact 
same parameter name. For example, if you define an argument as ``t_kwargs`` and 
assign this value to ``self.start_trigger_args.trigger_kwargs``, it will not 
work.
+
+.. code-block:: python
+
+    from datetime import timedelta
+    from typing import Any
+
+    from airflow.sensors.base import BaseSensorOperator
+    from airflow.triggers.temporal import TimeDeltaTrigger
+    from airflow.utils.context import Context
+
+
+    class WaitTwoHourSensor(BaseSensorOperator):
+        start_trigger_args = StartTriggerArgs(
+            trigger_cls="airflow.triggers.temporal.TimeDeltaTrigger",
+            trigger_kwargs={},
+            next_method="execute_complete",
+            timeout=None,
+        )
+
+        def __init__(
+            self,
+            *args: list[Any],
+            trigger_kwargs: dict[str, Any] | None,
+            start_from_trigger: bool,
+            **kwargs: dict[str, Any],
+        ) -> None:
+            super().__init__(*args, **kwargs)
+            self.start_trigger_args.trigger_kwargs = trigger_kwargs
+            self.start_from_trigger = start_from_trigger
+
+        def execute_complete(self, context: Context, event: dict[str, Any] | 
None = None) -> None:
+            # We have no more work to do here. Mark as complete.
+            return
+
+These parameters can be mapped using the ``expand`` and ``partial`` methods.
+
+.. code-block:: python
+
+    
WaitTwoHourSensor.partial(task_id="transform").partial(start_from_trigger=True).expand(

Review Comment:
   Wei and I spent a good amount of time just now talking through this. 
Ultimately, this feature is only for mapped + deferred (already a smallish 
subset of tasks), but also will only be usable by operators that _can_ start 
from the triggerer. The number of tasks that hit all 3 of those is pretty darn 
small - some very rough math, like somewhere in the neighborhood of .2% of all 
deferred tasks (obviously varies by specific use cases)!
   
   But if you need it, you need it. I'm not going to hold up this one because 
of the interface since realistically it'll only be an option for a tiny amount 
of users anyways.
   
   For posterity: A DAG author being able to control whether it starts in the 
triggerer or worker is a non-goal imo. Ideally the interface is the same 
regardless of whether a task starts in the triggerer or worker, but that has 
technical hurdles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to