Lee-W commented on code in PR #59604:
URL: https://github.com/apache/airflow/pull/59604#discussion_r2670642374


##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1648,7 +1678,17 @@ def main():
     SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](log=log)
 
     try:
-        ti, context, log = startup()
+        try:
+            ti, context, log = startup()
+        except AirflowRescheduleException as reschedule:
+            log.info("Rescheduling task during startup, marking task as 
UP_FOR_RESCHEDULE")

Review Comment:
   ```suggestion
               log.warning("Rescheduling task during startup, marking task as 
UP_FOR_RESCHEDULE")
   ```



##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -1677,6 +1677,33 @@ workers:
       type: float
       example: ~
       default: "60.0"
+    task_startup_timeout_retries:
+      description: |
+        Maximum number of times a task will be rescheduled if the worker fails 
to
+        load the Dag or task definition during startup.
+
+        This situation can occur due to transient infrastructure issues such as
+        missing Dag files, temporary filesystem or network problems, or bundle
+        synchronization delays. Rescheduling in this case does not count as a
+        task retry.
+
+        Set this value to 0 to disable rescheduling and fail the task 
immediately
+        on startup failures.
+      version_added: 3.1.6

Review Comment:
   updated to 3.1.7



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -669,6 +670,33 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str, 
value: Any) -> None:
     )
 
 
+def _maybe_reschedule_startup_failure(
+    *,
+    ti_context: TIRunContext,
+    log: Logger,
+) -> None:
+    """
+    Attempt to reschedule the task when a startup failure occurs.
+
+    This does not count as a retry. If the reschedule limit is exceeded, this 
function
+    returns and the caller should fail the task.
+    """
+    max_reschedules = conf.getint("workers", "task_startup_timeout_retries", 
fallback=3)
+    reschedule_delay = conf.getint("workers", "task_startup_timeout", 
fallback=60)
+
+    reschedule_count = int(getattr(ti_context, "task_reschedule_count", 0) or 
0)

Review Comment:
   to avoid infinite reschedules



##########
airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py:
##########
@@ -43,9 +43,18 @@ def _get_dep_statuses(self, ti, session, dep_context):
         in the future.
         """
         if (
+            # Exclude infrastructure-triggered reschedules.
+            # When a task is rescheduled due to a startup failure (e.g. the 
task runner
+            # cannot load the Dag or task), the task instance will be set
+            # in UP_FOR_RESCHEDULE state. In this case, we'll ignore the task's
+            # "reschedule" attribute check, since the reschedule was not 
initiated by the
+            # operator itself.
+            ti.state is None

Review Comment:
   The original logic skipped all non-mapped tasks without a `.reschedule` 
property. I updated it to check `ti.state != 
TaskInstanceState.UP_FOR_RESCHEDULE` so that the behavior remains the same, 
except for the case where the state is explicitly set to `UP_FOR_RESCHEDULE`, 
which is expected in the new task_runner logic we added in this PR.



##########
airflow-core/src/airflow/models/dagrun.py:
##########
@@ -1556,7 +1556,10 @@ def _expand_mapped_task_if_needed(ti: TI) -> 
Iterable[TI] | None:
             if TYPE_CHECKING:
                 assert isinstance(schedulable.task, Operator)
             old_state = schedulable.state
-            if not schedulable.are_dependencies_met(session=session, 
dep_context=dep_context):
+            if not schedulable.are_dependencies_met(
+                session=session,
+                dep_context=dep_context,
+            ):

Review Comment:
   reverted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to