Lee-W commented on code in PR #40084:
URL: https://github.com/apache/airflow/pull/40084#discussion_r1688152827


##########
airflow/models/dagbag.py:
##########
@@ -512,6 +514,16 @@ def _bag_dag(self, *, dag, root_dag, recursive):
             settings.dag_policy(dag)
 
             for task in dag.tasks:
+                # The listeners are not supported when ending a task via a 
trigger on asynchronous operators.
+                if getattr(task, "end_from_trigger", False) and 
get_listener_manager().has_listeners:
+                    raise AirflowException(
+                        "Listeners are not supported with 
end_from_trigger=True for async operators. "

Review Comment:
   ```suggestion
                           "Listeners are not supported with 
end_from_trigger=True for deferrable operators. "
   ```
   
   not sure whether "deferrable operators" is the best choice, but the term 
"async op" doesn't seem to be used that often in airflow



##########
airflow/models/taskinstance.py:
##########
@@ -94,7 +94,6 @@
 from airflow.models.dagbag import DagBag
 from airflow.models.dataset import DatasetAliasModel, DatasetModel
 from airflow.models.log import Log
-from airflow.models.mappedoperator import MappedOperator

Review Comment:
   Why do we need to make this change?



##########
docs/apache-airflow/authoring-and-scheduling/deferring.rst:
##########
@@ -253,6 +253,59 @@ These parameters can be mapped using the ``expand`` and 
``partial`` methods. Not
         trigger_kwargs=[{"moment": timedelta(hours=2)}, {"moment": 
timedelta(hours=2)}]
     )
 
+Exiting deferred task from Triggers
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ .. versionadded:: 2.10.0
+
+If you want to exit your task directly from the triggerer without going into 
the worker, you can specific operator level attribute ``end_from_trigger`` with 
the attributes to your deferrable operator other discussed above. This can save 
some resources needed to start a new worker.

Review Comment:
   ```suggestion
   If you want to exit your task directly from the triggerer without going into 
the worker, you can specify the instance level attribute ``end_from_trigger`` 
with the attributes of your deferrable operator, as discussed above. This can 
save some resources needed to start a new worker.
   ```
   
   I feel it's not actually operator-level. The example looks like an 
instance-level attribute



##########
airflow/sensors/date_time.py:
##########
@@ -85,18 +85,21 @@ class DateTimeSensorAsync(DateTimeSensor):
     It is a drop-in replacement for DateTimeSensor.
 
     :param target_time: datetime after which the job succeeds. (templated)
+    :param end_from_trigger: End the task directly from the triggerer without 
going into the worker.
     """
 
-    def __init__(self, **kwargs) -> None:
+    def __init__(self, end_from_trigger: bool = False, **kwargs) -> None:

Review Comment:
   ```suggestion
       def __init__(self, *, end_from_trigger: bool = False, **kwargs) -> None:
   ```



##########
tests/sensors/test_time_sensor.py:
##########
@@ -63,7 +63,6 @@ def test_task_is_deferred(self):
 
         assert isinstance(exc_info.value.trigger, DateTimeTrigger)
         assert exc_info.value.trigger.moment == timezone.datetime(2020, 7, 7, 
10)
-        assert exc_info.value.method_name == "execute_complete"

Review Comment:
   If that's the case, we probably shouldn't remove this line. It's still the 
correct value, but it just does not run. Not sure whether we have a way to 
check it's not running here. I guess not. probably another test case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to