Lee-W commented on code in PR #40084:
URL: https://github.com/apache/airflow/pull/40084#discussion_r1681107357


##########
airflow/dag_processing/processor.py:
##########
@@ -796,8 +818,16 @@ def _execute_task_callbacks(self, dagbag: DagBag | None, 
request: TaskCallbackRe
         if task:
             ti.refresh_from_task(task)
 
-        ti.handle_failure(error=request.msg, test_mode=self.UNIT_TEST_MODE, 
session=session)
-        self.log.info("Executed failure callback for %s in state %s", ti, 
ti.state)
+        if callback_type is TaskInstanceState.SUCCESS:
+            context = ti.get_template_context(session=session)
+            if not ti.task:
+                return
+            callbacks = ti.task.on_success_callback
+            _run_finished_callback(callbacks=callbacks, context=context)
+            self.log.info("Executed callback for %s in state %s", ti, ti.state)
+        elif not is_remote or callback_type is TaskInstanceState.FAILED:

Review Comment:
   Is this `not is_remote` required here? Looks like `is_remote` and 
`callback_type` are not changed after line 780. Is there anything I missed?



##########
airflow/sensors/time_delta.py:
##########
@@ -73,14 +73,10 @@ def execute(self, context: Context) -> bool | NoReturn:
             # If the target datetime is in the past, return immediately
             return True
         try:
-            trigger = DateTimeTrigger(moment=target_dttm)
+            trigger = DateTimeTrigger(moment=target_dttm, end_task=True)
         except (TypeError, ValueError) as e:
             if self.soft_fail:
                 raise AirflowSkipException("Skipping due to soft_fail is set 
to True.") from e
             raise
 
-        self.defer(trigger=trigger, method_name="execute_complete")
-
-    def execute_complete(self, context, event=None) -> None:
-        """Execute for when the trigger fires - return immediately."""
-        return None
+        self.defer(trigger=trigger, method_name=TaskDeferred.TRIGGER_EXIT)

Review Comment:
   `TaskDeferred.TRIGGER_EXIT` looks like just a special value to indicate we 
want to end the task in the trigger. I'm wondering about whether we really need 
to make it part of the exception `TaskDerferred.` Just making it a dedicated 
class/lambda/variable/enum makes more sense to me.



##########
airflow/sensors/time_sensor.py:
##########
@@ -72,13 +73,8 @@ def __init__(self, *, target_time: datetime.time, **kwargs) 
-> None:
 
         self.target_datetime = timezone.convert_to_utc(aware_time)
 
-    def execute(self, context: Context) -> NoReturn:
-        trigger = DateTimeTrigger(moment=self.target_datetime)
+    def execute(self, context: Context) -> None:
         self.defer(
-            trigger=trigger,
-            method_name="execute_complete",
+            method_name=TaskDeferred.TRIGGER_EXIT,
+            trigger=DateTimeTrigger(moment=self.target_datetime, 
end_task=True),

Review Comment:
   If we already set `method_name` to `TaskDeferred.TRIGGER_EXIT`, what's the 
propose of `end_task`?



##########
airflow/triggers/temporal.py:
##########
@@ -34,9 +34,13 @@ class DateTimeTrigger(BaseTrigger):
     a few seconds.
 
     The provided datetime MUST be in UTC.
+
+    :param moment: when to yield event
+    :param end_task: whether the trigger should mark the task successful after 
time condition
+        reached or resume the task after time condition reached.
     """
 
-    def __init__(self, moment: datetime.datetime):
+    def __init__(self, moment: datetime.datetime, *, end_task=False):

Review Comment:
   nitpick: missing type annotation



##########
airflow/sensors/time_sensor.py:
##########
@@ -40,11 +41,11 @@ class TimeSensor(BaseSensorOperator):
 
     """
 
-    def __init__(self, *, target_time: datetime.time, **kwargs) -> None:
+    def __init__(self, *, target_time, **kwargs):
         super().__init__(**kwargs)
         self.target_time = target_time
 
-    def poke(self, context: Context) -> bool:
+    def poke(self, context: Context):

Review Comment:
   missing type



##########
airflow/sensors/time_sensor.py:
##########
@@ -40,11 +40,11 @@ class TimeSensor(BaseSensorOperator):
 
     """
 
-    def __init__(self, *, target_time: datetime.time, **kwargs) -> None:
+    def __init__(self, *, target_time, **kwargs):

Review Comment:
   The typing is still missing here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to