uranusjr commented on code in PR #40084:
URL: https://github.com/apache/airflow/pull/40084#discussion_r1687837706
##########
tests/sensors/test_time_sensor.py:
##########
@@ -63,7 +63,6 @@ def test_task_is_deferred(self):
assert isinstance(exc_info.value.trigger, DateTimeTrigger)
assert exc_info.value.trigger.moment == timezone.datetime(2020, 7, 7,
10)
- assert exc_info.value.method_name == "execute_complete"
Review Comment:
Should we still check the method name is `__trigger_exit__` here?
##########
airflow/models/taskinstance.py:
##########
@@ -83,7 +83,6 @@
AirflowTaskTimeout,
DagRunNotFound,
RemovedInAirflow3Warning,
- TaskDeferralError,
Review Comment:
Can we remove this? (Backward compatibility)
##########
tests/models/test_trigger.py:
##########
@@ -144,6 +151,58 @@ def test_submit_failure(session, create_task_instance):
assert updated_task_instance.next_method == "__fail__"
[email protected](
+ "event_cls, expected",
+ [
+ (TaskSuccessEvent, "success"),
+ (TaskFailedEvent, "failed"),
+ (TaskSkippedEvent, "skipped"),
+ ],
+)
+def test_submit_event_task_end(session, create_task_instance, event_cls,
expected):
+ """
+ Tests that events inheriting BaseTaskEndEvent *don't* re-wake their
dependent
+ but mark them in the appropriate terminal state and send xcom
+ """
+ # Make a trigger
+ trigger = Trigger(classpath="does.not.matter", kwargs={})
+ trigger.id = 1
+ session.add(trigger)
+ session.commit()
+ # Make a TaskInstance that's deferred and waiting on it
+ task_instance = create_task_instance(
+ session=session, execution_date=timezone.utcnow(), state=State.DEFERRED
+ )
+ task_instance.trigger_id = trigger.id
+ session.commit()
+
+ def get_xcoms(ti):
+ return XCom.get_many(dag_ids=[ti.dag_id], task_ids=[ti.task_id],
run_id=ti.run_id).all()
+
+ # now for the real test
+ # first check initial state
+ ti: TaskInstance = session.query(TaskInstance).one()
+ assert ti.state == "deferred"
+ assert get_xcoms(ti) == []
+
+ session.flush()
+ session.expunge_all()
Review Comment:
Why do we need to expunge? (Same question for the call below)
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -253,6 +253,73 @@ These parameters can be mapped using the ``expand`` and
``partial`` methods. Not
trigger_kwargs=[{"moment": timedelta(hours=2)}, {"moment":
timedelta(hours=2)}]
)
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into
the worker, you can specific operator level attribute ``end_from_trigger`` with
the attributes to your deferrable operator other discussed above.
+
+Triggers can have two options: they can either send execution back to the
worker or end the task instance directly. If the trigger ends the task instance
itself, the ``method_name`` does not matter and can be ``None``. Otherwise,
provide the name of the ``method_name`` that should be used when resuming
execution in the task.
Review Comment:
```suggestion
Triggers can have two options: they can either send execution back to the
worker or end the task instance directly. If the trigger ends the task instance
itself, the ``method_name`` does not matter and can be ``None``. Otherwise,
provide ``method_name`` that should be used when resuming execution in the task.
```
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -253,6 +253,73 @@ These parameters can be mapped using the ``expand`` and
``partial`` methods. Not
trigger_kwargs=[{"moment": timedelta(hours=2)}, {"moment":
timedelta(hours=2)}]
)
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into
the worker, you can specific operator level attribute ``end_from_trigger`` with
the attributes to your deferrable operator other discussed above.
+
+Triggers can have two options: they can either send execution back to the
worker or end the task instance directly. If the trigger ends the task instance
itself, the ``method_name`` does not matter and can be ``None``. Otherwise,
provide the name of the ``method_name`` that should be used when resuming
execution in the task.
+
+.. code-block:: python
+
+ class DateTimeSensorAsync(DateTimeSensor):
+ def __init__(self, end_from_trigger: bool = True, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self.end_from_trigger = end_from_trigger
+
+ def execute(self, context: Context) -> NoReturn:
+ self.defer(
+ method_name="execute_complete",
+ trigger=DateTimeTrigger(
+ moment=timezone.parse(self.target_time),
end_from_trigger=self.end_from_trigger
+ ),
+ )
+
+ def execute_complete(self, context, event=None) -> None:
+ return None
+
+``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be
used to end the task instance directly. This marks the task with the state
``task_instance_state`` and optionally pushes xcom if applicable. Here's an
example of how to use these events:
+
+.. code-block:: python
+
+
+ class DateTimeTrigger(BaseTrigger):
+ def __init__(self, moment: datetime.datetime, *, end_from_trigger:
bool = False):
Review Comment:
Same here, we can make the example simpler so it is easier to understand.
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -253,6 +253,73 @@ These parameters can be mapped using the ``expand`` and
``partial`` methods. Not
trigger_kwargs=[{"moment": timedelta(hours=2)}, {"moment":
timedelta(hours=2)}]
)
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into
the worker, you can specific operator level attribute ``end_from_trigger`` with
the attributes to your deferrable operator other discussed above.
+
+Triggers can have two options: they can either send execution back to the
worker or end the task instance directly. If the trigger ends the task instance
itself, the ``method_name`` does not matter and can be ``None``. Otherwise,
provide the name of the ``method_name`` that should be used when resuming
execution in the task.
+
+.. code-block:: python
+
+ class DateTimeSensorAsync(DateTimeSensor):
+ def __init__(self, end_from_trigger: bool = True, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self.end_from_trigger = end_from_trigger
+
+ def execute(self, context: Context) -> NoReturn:
+ self.defer(
+ method_name="execute_complete",
+ trigger=DateTimeTrigger(
+ moment=timezone.parse(self.target_time),
end_from_trigger=self.end_from_trigger
+ ),
+ )
+
+ def execute_complete(self, context, event=None) -> None:
+ return None
+
+``TaskSuccessEvent`` and ``TaskFailureEvent`` are the two events that can be
used to end the task instance directly. This marks the task with the state
``task_instance_state`` and optionally pushes xcom if applicable. Here's an
example of how to use these events:
+
+.. code-block:: python
+
+
+ class DateTimeTrigger(BaseTrigger):
+ def __init__(self, moment: datetime.datetime, *, end_from_trigger:
bool = False):
Review Comment:
Maybe we should have a better example than datatime in the first place;
people don’t ever need to implement a datetime sensor because Airflow already
has one.
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -253,6 +253,73 @@ These parameters can be mapped using the ``expand`` and
``partial`` methods. Not
trigger_kwargs=[{"moment": timedelta(hours=2)}, {"moment":
timedelta(hours=2)}]
)
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into
the worker, you can specific operator level attribute ``end_from_trigger`` with
the attributes to your deferrable operator other discussed above.
Review Comment:
```suggestion
If you want to exit your task directly from the triggerer without going into
the worker, you can specific operator level attribute ``end_from_trigger`` with
the attributes to your deferrable operator other discussed above. This can save
some resources needed to start a new worker.
```
##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -253,6 +253,73 @@ These parameters can be mapped using the ``expand`` and
``partial`` methods. Not
trigger_kwargs=[{"moment": timedelta(hours=2)}, {"moment":
timedelta(hours=2)}]
)
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into
the worker, you can specific operator level attribute ``end_from_trigger`` with
the attributes to your deferrable operator other discussed above.
+
+Triggers can have two options: they can either send execution back to the
worker or end the task instance directly. If the trigger ends the task instance
itself, the ``method_name`` does not matter and can be ``None``. Otherwise,
provide the name of the ``method_name`` that should be used when resuming
execution in the task.
+
+.. code-block:: python
+
+ class DateTimeSensorAsync(DateTimeSensor):
+ def __init__(self, end_from_trigger: bool = True, **kwargs) -> None:
+ super().__init__(**kwargs)
+ self.end_from_trigger = end_from_trigger
+
+ def execute(self, context: Context) -> NoReturn:
+ self.defer(
+ method_name="execute_complete",
+ trigger=DateTimeTrigger(
+ moment=timezone.parse(self.target_time),
end_from_trigger=self.end_from_trigger
+ ),
+ )
+
+ def execute_complete(self, context, event=None) -> None:
+ return None
Review Comment:
```suggestion
class DateTimeSensorAsync(DateTimeSensor):
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self.end_from_trigger = True
def execute(self, context: Context) -> NoReturn:
self.defer(
method_name=None,
trigger=DateTimeTrigger(
moment=timezone.parse(self.target_time),
end_from_trigger=self.end_from_trigger
),
)
```
Let’s make this example simple for people. If users want their sensor to
support both modes, I think they should be smart enough to add the argument.
(We should also add text to tell users this sensor _always_ exit from
trigger.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]