This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new fddf4a72406 Reschedule tasks on worker startup Dag load failures 
instead of exiting (#59604) (#60926)
fddf4a72406 is described below

commit fddf4a72406fda039dea813c0ff6e58dd0cd2891
Author: Wei Lee <[email protected]>
AuthorDate: Thu Jan 22 18:16:22 2026 +0800

    Reschedule tasks on worker startup Dag load failures instead of exiting 
(#59604) (#60926)
---
 .../src/airflow/config_templates/config.yml        | 27 +++++++
 airflow-core/src/airflow/models/taskinstance.py    | 12 +++
 .../src/airflow/ti_deps/deps/base_ti_dep.py        |  3 +-
 .../airflow/ti_deps/deps/ready_to_reschedule.py    | 36 +++++----
 .../ti_deps/deps/test_ready_to_reschedule_dep.py   | 21 +++---
 .../src/airflow/sdk/execution_time/task_runner.py  | 53 +++++++++++--
 .../task_sdk/execution_time/test_task_runner.py    | 87 +++++++++++++++++++++-
 7 files changed, 205 insertions(+), 34 deletions(-)

diff --git a/airflow-core/src/airflow/config_templates/config.yml 
b/airflow-core/src/airflow/config_templates/config.yml
index d6f9d332655..169e2589a62 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1603,6 +1603,33 @@ workers:
       type: float
       example: ~
       default: "60.0"
+    missing_dag_retires:
+      description: |
+        Maximum number of times a task will be rescheduled if the worker fails 
to
+        load the Dag or task definition during startup.
+
+        This situation can occur due to transient infrastructure issues such as
+        missing Dag files, temporary filesystem or network problems, or bundle
+        synchronization delays. Rescheduling in this case does not count as a
+        task retry.
+
+        Set this value to 0 to disable rescheduling and fail the task 
immediately
+        on startup failures.
+      version_added: 3.1.7
+      type: integer
+      example: ~
+      default: "3"
+    missing_dag_retry_delay:
+      description: |
+        Delay in seconds before a task is rescheduled after a worker startup
+        failure caused by an inability to load the Dag or task definition.
+
+        This delay is applied when the task runner requests the scheduler to
+        reschedule the task instance in UP_FOR_RESCHEDULE state.
+      version_added: 3.1.7
+      type: integer
+      example: ~
+      default: "60"
 api_auth:
   description: Settings relating to authentication on the Airflow APIs
   options:
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 29f2d98eefa..94b9179c6e2 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -87,6 +87,7 @@ from airflow.settings import task_instance_mutation_hook
 from airflow.stats import Stats
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
+from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
 from airflow.utils.helpers import prune_dict
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.net import get_hostname
@@ -898,6 +899,17 @@ class TaskInstance(Base, LoggingMixin):
         :param verbose: whether log details on failed dependencies on info or 
debug log level
         """
         dep_context = dep_context or DepContext()
+        if self.state == TaskInstanceState.UP_FOR_RESCHEDULE:
+            # This DepContext is used when a task instance is in 
UP_FOR_RESCHEDULE state.
+            #
+            # Tasks can be put into UP_FOR_RESCHEDULE by the task runner 
itself (e.g. when
+            # the worker cannot load the Dag or task). In this case, the 
scheduler must respect
+            # the task instance's reschedule_date before scheduling it again.
+            #
+            # ReadyToRescheduleDep is the only dependency that enforces this 
time-based gating.
+            # We therefore extend the normal scheduling dependency set with 
it, instead of
+            # modifying the global scheduler dependencies.
+            dep_context.deps.add(ReadyToRescheduleDep())
         failed = False
         verbose_aware_logger = self.log.info if verbose else self.log.debug
         for dep_status in 
self.get_failed_dep_statuses(dep_context=dep_context, session=session):
diff --git a/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py 
b/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
index 4d54783b020..bc4fb303117 100644
--- a/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
@@ -17,13 +17,14 @@
 # under the License.
 from __future__ import annotations
 
-from collections.abc import Iterator
 from typing import TYPE_CHECKING, NamedTuple
 
 from airflow.ti_deps.dep_context import DepContext
 from airflow.utils.session import provide_session
 
 if TYPE_CHECKING:
+    from collections.abc import Iterator
+
     from sqlalchemy.orm import Session
 
     from airflow.models.taskinstance import TaskInstance
diff --git a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py 
b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
index 501b1574205..2855fa25499 100644
--- a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -17,13 +17,23 @@
 # under the License.
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 from airflow._shared.timezones import timezone
-from airflow.executors.executor_loader import ExecutorLoader
 from airflow.models.taskreschedule import TaskReschedule
 from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
 from airflow.utils.session import provide_session
 from airflow.utils.state import TaskInstanceState
 
+if TYPE_CHECKING:
+    from collections.abc import Iterator
+
+    from sqlalchemy.orm import Session
+
+    from airflow.models.taskinstance import TaskInstance
+    from airflow.ti_deps.dep_context import DepContext
+    from airflow.ti_deps.deps.base_ti_dep import TIDepStatus
+
 
 class ReadyToRescheduleDep(BaseTIDep):
     """Determines whether a task is ready to be rescheduled."""
@@ -34,27 +44,22 @@ class ReadyToRescheduleDep(BaseTIDep):
     RESCHEDULEABLE_STATES = {TaskInstanceState.UP_FOR_RESCHEDULE, None}
 
     @provide_session
-    def _get_dep_statuses(self, ti, session, dep_context):
+    def _get_dep_statuses(
+        self,
+        ti: TaskInstance,
+        session: Session,
+        dep_context: DepContext,
+    ) -> Iterator[TIDepStatus]:
         """
         Determine whether a task is ready to be rescheduled.
 
-        Only tasks in NONE state with at least one row in task_reschedule 
table are
+        Only tasks in NONE or UP_FOR_RESCHEDULE state with at least one row in 
task_reschedule table are
         handled by this dependency class, otherwise this dependency is 
considered as passed.
         This dependency fails if the latest reschedule request's reschedule 
date is still
         in the future.
         """
         from airflow.models.mappedoperator import MappedOperator
 
-        is_mapped = isinstance(ti.task, MappedOperator)
-        executor, _ = ExecutorLoader.import_default_executor_cls()
-        if (
-            # Mapped sensors don't have the reschedule property (it can only 
be calculated after unmapping),
-            # so we don't check them here. They are handled below by checking 
TaskReschedule instead.
-            not is_mapped and not getattr(ti.task, "reschedule", False)
-        ):
-            yield self._passing_status(reason="Task is not in reschedule 
mode.")
-            return
-
         if dep_context.ignore_in_reschedule_period:
             yield self._passing_status(
                 reason="The context specified that being in a reschedule 
period was permitted."
@@ -75,14 +80,13 @@ class ReadyToRescheduleDep(BaseTIDep):
         if not next_reschedule_date:
             # Because mapped sensors don't have the reschedule property, 
here's the last resort
             # and we need a slightly different passing reason
-            if is_mapped:
+            if isinstance(ti.task, MappedOperator):
                 yield self._passing_status(reason="The task is mapped and not 
in reschedule mode")
                 return
             yield self._passing_status(reason="There is no reschedule request 
for this task instance.")
             return
 
-        now = timezone.utcnow()
-        if now >= next_reschedule_date:
+        if (now := timezone.utcnow()) >= next_reschedule_date:
             yield self._passing_status(reason="Task instance id ready for 
reschedule.")
             return
 
diff --git 
a/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py 
b/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
index 355cbfb9c30..27d8f4674d5 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
@@ -103,11 +103,6 @@ class TestNotInReschedulePeriodDep:
         dep_context = DepContext(ignore_in_reschedule_period=True)
         assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)
 
-    def test_should_pass_if_not_reschedule_mode(self, not_expected_tr_db_call):
-        ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
-        del ti.task.reschedule
-        assert ReadyToRescheduleDep().is_met(ti=ti)
-
     def test_should_pass_if_not_in_none_state(self, not_expected_tr_db_call):
         ti = self._get_task_instance(State.UP_FOR_RETRY)
         assert ReadyToRescheduleDep().is_met(ti=ti)
@@ -126,6 +121,17 @@ class TestNotInReschedulePeriodDep:
         self._create_task_reschedule(ti, [-21, -11, -1])
         assert ReadyToRescheduleDep().is_met(ti=ti)
 
+    def 
test_should_fail_before_reschedule_date_even_if_task_is_not_reschedule_mode(self):
+        """
+        When a task is in UP_FOR_RESCHEDULE state but the operator itself is 
not in reschedule mode
+        (i.e. reschedule was triggered by infrastructure/startup failure), we 
still must respect the
+        TaskReschedule.reschedule_date.
+        """
+        ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
+        del ti.task.reschedule
+        self._create_task_reschedule(ti, 1)
+        assert not ReadyToRescheduleDep().is_met(ti=ti)
+
     def test_should_fail_before_reschedule_date_one(self):
         ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
         self._create_task_reschedule(ti, 1)
@@ -142,11 +148,6 @@ class TestNotInReschedulePeriodDep:
         dep_context = DepContext(ignore_in_reschedule_period=True)
         assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)
 
-    def test_mapped_task_should_pass_if_not_reschedule_mode(self, 
not_expected_tr_db_call):
-        ti = self._get_task_instance(State.UP_FOR_RESCHEDULE, map_index=42)
-        del ti.task.reschedule
-        assert ReadyToRescheduleDep().is_met(ti=ti)
-
     def test_mapped_task_should_pass_if_not_in_none_state(self, 
not_expected_tr_db_call):
         ti = self._get_task_instance(State.UP_FOR_RETRY, map_index=42)
         assert ReadyToRescheduleDep().is_met(ti=ti)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 6a64983edfa..4ef67fb7496 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -27,7 +27,7 @@ import sys
 import time
 from collections.abc import Callable, Iterable, Iterator, Mapping
 from contextlib import suppress
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
 from itertools import product
 from pathlib import Path
 from typing import TYPE_CHECKING, Annotated, Any, Literal
@@ -41,7 +41,11 @@ from pydantic import AwareDatetime, ConfigDict, Field, 
JsonValue, TypeAdapter
 from airflow.configuration import conf
 from airflow.dag_processing.bundles.base import BaseDagBundle, 
BundleVersionLock
 from airflow.dag_processing.bundles.manager import DagBundlesManager
-from airflow.exceptions import AirflowInactiveAssetInInletOrOutletException, 
AirflowTaskTimeout
+from airflow.exceptions import (
+    AirflowInactiveAssetInInletOrOutletException,
+    AirflowRescheduleException,
+    AirflowTaskTimeout,
+)
 from airflow.listeners.listener import get_listener_manager
 from airflow.sdk.api.client import get_hostname, getuser
 from airflow.sdk.api.datamodels._generated import (
@@ -604,6 +608,33 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str, 
value: Any) -> None:
     )
 
 
+def _maybe_reschedule_startup_failure(
+    *,
+    ti_context: TIRunContext,
+    log: Logger,
+) -> None:
+    """
+    Attempt to reschedule the task when a startup failure occurs.
+
+    This does not count as a retry. If the reschedule limit is exceeded, this 
function
+    returns and the caller should fail the task.
+    """
+    missing_dag_retires = conf.getint("workers", "missing_dag_retires", 
fallback=3)
+    missing_dag_retry_delay = conf.getint("workers", 
"missing_dag_retry_delay", fallback=60)
+
+    reschedule_count = int(getattr(ti_context, "task_reschedule_count", 0) or 
0)
+    if missing_dag_retires > 0 and reschedule_count < missing_dag_retires:
+        raise AirflowRescheduleException(
+            reschedule_date=datetime.now(tz=timezone.utc) + 
timedelta(seconds=missing_dag_retry_delay)
+        )
+
+    log.error(
+        "Startup reschedule limit exceeded",
+        reschedule_count=reschedule_count,
+        max_reschedules=missing_dag_retires,
+    )
+
+
 def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
     # TODO: Task-SDK:
     # Using DagBag here is about 98% wrong, but it'll do for now
@@ -638,6 +669,7 @@ def parse(what: StartupDetails, log: Logger) -> 
RuntimeTaskInstance:
         log.error(
             "Dag not found during start up", dag_id=what.ti.dag_id, 
bundle=bundle_info, path=what.dag_rel_path
         )
+        _maybe_reschedule_startup_failure(ti_context=what.ti_context, log=log)
         sys.exit(1)
 
     # install_loader()
@@ -652,6 +684,7 @@ def parse(what: StartupDetails, log: Logger) -> 
RuntimeTaskInstance:
             bundle=bundle_info,
             path=what.dag_rel_path,
         )
+        _maybe_reschedule_startup_failure(ti_context=what.ti_context, log=log)
         sys.exit(1)
 
     if not isinstance(task, (BaseOperator, MappedOperator)):
@@ -1547,7 +1580,17 @@ def main():
     SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](log=log)
 
     try:
-        ti, context, log = startup()
+        try:
+            ti, context, log = startup()
+        except AirflowRescheduleException as reschedule:
+            log.warning("Rescheduling task during startup, marking task as 
UP_FOR_RESCHEDULE")
+            SUPERVISOR_COMMS.send(
+                msg=RescheduleTask(
+                    reschedule_date=reschedule.reschedule_date,
+                    end_date=datetime.now(tz=timezone.utc),
+                )
+            )
+            sys.exit(0)
         with BundleVersionLock(
             bundle_name=ti.bundle_instance.name,
             bundle_version=ti.bundle_instance.version,
@@ -1557,10 +1600,10 @@ def main():
             finalize(ti, state, context, log, error)
     except KeyboardInterrupt:
         log.exception("Ctrl-c hit")
-        exit(2)
+        sys.exit(2)
     except Exception:
         log.exception("Top level error")
-        exit(1)
+        sys.exit(1)
     finally:
         # Ensure the request socket is closed on the child side in all 
circumstances
         # before the process fully terminates.
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index cb0086195ac..b8f61a2bc50 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -23,7 +23,7 @@ import json
 import os
 import textwrap
 from collections.abc import Iterable
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone as dt_timezone
 from pathlib import Path
 from typing import TYPE_CHECKING
 from unittest import mock
@@ -37,6 +37,7 @@ from uuid6 import uuid7
 from airflow.exceptions import (
     AirflowException,
     AirflowFailException,
+    AirflowRescheduleException,
     AirflowSensorTimeout,
     AirflowSkipException,
     AirflowTaskTerminated,
@@ -69,6 +70,7 @@ from airflow.sdk.definitions._internal.types import NOTSET, 
SET_DURING_EXECUTION
 from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetUniqueKey, 
Dataset, Model
 from airflow.sdk.definitions.param import DagParam
 from airflow.sdk.exceptions import ErrorType
+from airflow.sdk.execution_time import task_runner
 from airflow.sdk.execution_time.comms import (
     AssetEventResult,
     AssetEventsResult,
@@ -91,6 +93,7 @@ from airflow.sdk.execution_time.comms import (
     OKResponse,
     PreviousDagRunResult,
     PrevSuccessfulDagRunResult,
+    RescheduleTask,
     SetRenderedFields,
     SetXCom,
     SkipDownstreamTasks,
@@ -125,6 +128,7 @@ from airflow.sdk.execution_time.task_runner import (
 )
 from airflow.sdk.execution_time.xcom import XCom
 
+from tests_common.test_utils.config import conf_vars
 from tests_common.test_utils.mock_operators import AirflowLink
 
 if TYPE_CHECKING:
@@ -236,7 +240,7 @@ def test_parse_not_found(test_dags_dir: Path, 
make_ti_context, dag_id, task_id,
                 ),
             },
         ),
-        pytest.raises(SystemExit),
+        pytest.raises(AirflowRescheduleException),
     ):
         parse(what, log)
 
@@ -244,6 +248,85 @@ def test_parse_not_found(test_dags_dir: Path, 
make_ti_context, dag_id, task_id,
     log.error.assert_has_calls([expected_error])
 
 
+def 
test_parse_not_found_does_not_reschedule_when_max_attempts_reached(test_dags_dir:
 Path, make_ti_context):
+    """
+    If the startup reschedule attempt limit is reached, parsing failures 
should not be rescheduled
+    and should surface as a hard failure (SystemExit in the task runner 
process).
+    """
+    what = StartupDetails(
+        ti=TaskInstance(
+            id=uuid7(),
+            task_id="a",
+            dag_id="madeup_dag_id",
+            run_id="c",
+            try_number=1,
+            dag_version_id=uuid7(),
+        ),
+        dag_rel_path="super_basic.py",
+        bundle_info=BundleInfo(name="my-bundle", version=None),
+        ti_context=make_ti_context(task_reschedule_count=3),
+        start_date=timezone.utcnow(),
+    )
+
+    log = mock.Mock()
+
+    with (
+        conf_vars(
+            {
+                ("workers", "missing_dag_retires"): "3",
+                ("workers", "missing_dag_retry_delay"): "60",
+            }
+        ),
+        patch.dict(
+            os.environ,
+            {
+                "AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(
+                    [
+                        {
+                            "name": "my-bundle",
+                            "classpath": 
"airflow.dag_processing.bundles.local.LocalDagBundle",
+                            "kwargs": {"path": str(test_dags_dir), 
"refresh_interval": 1},
+                        }
+                    ]
+                ),
+            },
+        ),
+        pytest.raises(SystemExit),
+    ):
+        parse(what, log)
+
+
[email protected]("builtins.exit", side_effect=lambda code: (_ for _ in 
()).throw(SystemExit(code)))
[email protected]("airflow.sdk.execution_time.task_runner.startup")
[email protected]("airflow.sdk.execution_time.task_runner.CommsDecoder")
+def test_main_sends_reschedule_task_when_startup_reschedules(
+    mock_comms_decoder_cls, mock_startup, mock_exit, time_machine
+):
+    """
+    If startup raises AirflowRescheduleException, the task runner should 
report a RescheduleTask
+    message to the supervisor and exit cleanly (code 0).
+    """
+    ts = datetime(2025, 1, 1, tzinfo=dt_timezone.utc)
+    reschedule_date = ts + timedelta(seconds=60)
+
+    mock_comms_instance = mock.Mock()
+    mock_comms_instance.socket = None
+    mock_comms_decoder_cls.__getitem__.return_value.return_value = 
mock_comms_instance
+    mock_startup.side_effect = 
AirflowRescheduleException(reschedule_date=reschedule_date)
+
+    # Move time
+    time_machine.move_to(ts, tick=False)
+
+    # Run & assert
+    with pytest.raises(SystemExit) as exc:
+        task_runner.main()
+
+    assert exc.value.code == 0
+    assert mock_comms_instance.mock_calls == [
+        call.send(msg=RescheduleTask(reschedule_date=reschedule_date, 
end_date=ts))
+    ]
+
+
 def test_parse_module_in_bundle_root(tmp_path: Path, make_ti_context):
     """Check that the bundle path is added to sys.path, so Dags can import 
shared modules."""
     tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")

Reply via email to