This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new fddf4a72406 Reschedule tasks on worker startup Dag load failures
instead of exiting (#59604) (#60926)
fddf4a72406 is described below
commit fddf4a72406fda039dea813c0ff6e58dd0cd2891
Author: Wei Lee <[email protected]>
AuthorDate: Thu Jan 22 18:16:22 2026 +0800
Reschedule tasks on worker startup Dag load failures instead of exiting
(#59604) (#60926)
---
.../src/airflow/config_templates/config.yml | 27 +++++++
airflow-core/src/airflow/models/taskinstance.py | 12 +++
.../src/airflow/ti_deps/deps/base_ti_dep.py | 3 +-
.../airflow/ti_deps/deps/ready_to_reschedule.py | 36 +++++----
.../ti_deps/deps/test_ready_to_reschedule_dep.py | 21 +++---
.../src/airflow/sdk/execution_time/task_runner.py | 53 +++++++++++--
.../task_sdk/execution_time/test_task_runner.py | 87 +++++++++++++++++++++-
7 files changed, 205 insertions(+), 34 deletions(-)
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index d6f9d332655..169e2589a62 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -1603,6 +1603,33 @@ workers:
type: float
example: ~
default: "60.0"
+ missing_dag_retires:
+ description: |
+ Maximum number of times a task will be rescheduled if the worker fails
to
+ load the Dag or task definition during startup.
+
+ This situation can occur due to transient infrastructure issues such as
+ missing Dag files, temporary filesystem or network problems, or bundle
+ synchronization delays. Rescheduling in this case does not count as a
+ task retry.
+
+ Set this value to 0 to disable rescheduling and fail the task
immediately
+ on startup failures.
+ version_added: 3.1.7
+ type: integer
+ example: ~
+ default: "3"
+ missing_dag_retry_delay:
+ description: |
+ Delay in seconds before a task is rescheduled after a worker startup
+ failure caused by an inability to load the Dag or task definition.
+
+ This delay is applied when the task runner requests the scheduler to
+ reschedule the task instance in UP_FOR_RESCHEDULE state.
+ version_added: 3.1.7
+ type: integer
+ example: ~
+ default: "60"
api_auth:
description: Settings relating to authentication on the Airflow APIs
options:
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 29f2d98eefa..94b9179c6e2 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -87,6 +87,7 @@ from airflow.settings import task_instance_mutation_hook
from airflow.stats import Stats
from airflow.ti_deps.dep_context import DepContext
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
+from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
from airflow.utils.helpers import prune_dict
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
@@ -898,6 +899,17 @@ class TaskInstance(Base, LoggingMixin):
:param verbose: whether log details on failed dependencies on info or
debug log level
"""
dep_context = dep_context or DepContext()
+ if self.state == TaskInstanceState.UP_FOR_RESCHEDULE:
+ # This DepContext is used when a task instance is in
UP_FOR_RESCHEDULE state.
+ #
+ # Tasks can be put into UP_FOR_RESCHEDULE by the task runner
itself (e.g. when
+ # the worker cannot load the Dag or task). In this case, the
scheduler must respect
+ # the task instance's reschedule_date before scheduling it again.
+ #
+ # ReadyToRescheduleDep is the only dependency that enforces this
time-based gating.
+ # We therefore extend the normal scheduling dependency set with
it, instead of
+ # modifying the global scheduler dependencies.
+ dep_context.deps.add(ReadyToRescheduleDep())
failed = False
verbose_aware_logger = self.log.info if verbose else self.log.debug
for dep_status in
self.get_failed_dep_statuses(dep_context=dep_context, session=session):
diff --git a/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
b/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
index 4d54783b020..bc4fb303117 100644
--- a/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
+++ b/airflow-core/src/airflow/ti_deps/deps/base_ti_dep.py
@@ -17,13 +17,14 @@
# under the License.
from __future__ import annotations
-from collections.abc import Iterator
from typing import TYPE_CHECKING, NamedTuple
from airflow.ti_deps.dep_context import DepContext
from airflow.utils.session import provide_session
if TYPE_CHECKING:
+ from collections.abc import Iterator
+
from sqlalchemy.orm import Session
from airflow.models.taskinstance import TaskInstance
diff --git a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
index 501b1574205..2855fa25499 100644
--- a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -17,13 +17,23 @@
# under the License.
from __future__ import annotations
+from typing import TYPE_CHECKING
+
from airflow._shared.timezones import timezone
-from airflow.executors.executor_loader import ExecutorLoader
from airflow.models.taskreschedule import TaskReschedule
from airflow.ti_deps.deps.base_ti_dep import BaseTIDep
from airflow.utils.session import provide_session
from airflow.utils.state import TaskInstanceState
+if TYPE_CHECKING:
+ from collections.abc import Iterator
+
+ from sqlalchemy.orm import Session
+
+ from airflow.models.taskinstance import TaskInstance
+ from airflow.ti_deps.dep_context import DepContext
+ from airflow.ti_deps.deps.base_ti_dep import TIDepStatus
+
class ReadyToRescheduleDep(BaseTIDep):
"""Determines whether a task is ready to be rescheduled."""
@@ -34,27 +44,22 @@ class ReadyToRescheduleDep(BaseTIDep):
RESCHEDULEABLE_STATES = {TaskInstanceState.UP_FOR_RESCHEDULE, None}
@provide_session
- def _get_dep_statuses(self, ti, session, dep_context):
+ def _get_dep_statuses(
+ self,
+ ti: TaskInstance,
+ session: Session,
+ dep_context: DepContext,
+ ) -> Iterator[TIDepStatus]:
"""
Determine whether a task is ready to be rescheduled.
- Only tasks in NONE state with at least one row in task_reschedule
table are
+ Only tasks in NONE or UP_FOR_RESCHEDULE state with at least one row in
task_reschedule table are
handled by this dependency class, otherwise this dependency is
considered as passed.
This dependency fails if the latest reschedule request's reschedule
date is still
in the future.
"""
from airflow.models.mappedoperator import MappedOperator
- is_mapped = isinstance(ti.task, MappedOperator)
- executor, _ = ExecutorLoader.import_default_executor_cls()
- if (
- # Mapped sensors don't have the reschedule property (it can only
be calculated after unmapping),
- # so we don't check them here. They are handled below by checking
TaskReschedule instead.
- not is_mapped and not getattr(ti.task, "reschedule", False)
- ):
- yield self._passing_status(reason="Task is not in reschedule
mode.")
- return
-
if dep_context.ignore_in_reschedule_period:
yield self._passing_status(
reason="The context specified that being in a reschedule
period was permitted."
@@ -75,14 +80,13 @@ class ReadyToRescheduleDep(BaseTIDep):
if not next_reschedule_date:
# Because mapped sensors don't have the reschedule property,
here's the last resort
# and we need a slightly different passing reason
- if is_mapped:
+ if isinstance(ti.task, MappedOperator):
yield self._passing_status(reason="The task is mapped and not
in reschedule mode")
return
yield self._passing_status(reason="There is no reschedule request
for this task instance.")
return
- now = timezone.utcnow()
- if now >= next_reschedule_date:
+ if (now := timezone.utcnow()) >= next_reschedule_date:
yield self._passing_status(reason="Task instance id ready for
reschedule.")
return
diff --git
a/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
b/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
index 355cbfb9c30..27d8f4674d5 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
@@ -103,11 +103,6 @@ class TestNotInReschedulePeriodDep:
dep_context = DepContext(ignore_in_reschedule_period=True)
assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)
- def test_should_pass_if_not_reschedule_mode(self, not_expected_tr_db_call):
- ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
- del ti.task.reschedule
- assert ReadyToRescheduleDep().is_met(ti=ti)
-
def test_should_pass_if_not_in_none_state(self, not_expected_tr_db_call):
ti = self._get_task_instance(State.UP_FOR_RETRY)
assert ReadyToRescheduleDep().is_met(ti=ti)
@@ -126,6 +121,17 @@ class TestNotInReschedulePeriodDep:
self._create_task_reschedule(ti, [-21, -11, -1])
assert ReadyToRescheduleDep().is_met(ti=ti)
+ def
test_should_fail_before_reschedule_date_even_if_task_is_not_reschedule_mode(self):
+ """
+ When a task is in UP_FOR_RESCHEDULE state but the operator itself is
not in reschedule mode
+ (i.e. reschedule was triggered by infrastructure/startup failure), we
still must respect the
+ TaskReschedule.reschedule_date.
+ """
+ ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
+ del ti.task.reschedule
+ self._create_task_reschedule(ti, 1)
+ assert not ReadyToRescheduleDep().is_met(ti=ti)
+
def test_should_fail_before_reschedule_date_one(self):
ti = self._get_task_instance(State.UP_FOR_RESCHEDULE)
self._create_task_reschedule(ti, 1)
@@ -142,11 +148,6 @@ class TestNotInReschedulePeriodDep:
dep_context = DepContext(ignore_in_reschedule_period=True)
assert ReadyToRescheduleDep().is_met(ti=ti, dep_context=dep_context)
- def test_mapped_task_should_pass_if_not_reschedule_mode(self,
not_expected_tr_db_call):
- ti = self._get_task_instance(State.UP_FOR_RESCHEDULE, map_index=42)
- del ti.task.reschedule
- assert ReadyToRescheduleDep().is_met(ti=ti)
-
def test_mapped_task_should_pass_if_not_in_none_state(self,
not_expected_tr_db_call):
ti = self._get_task_instance(State.UP_FOR_RETRY, map_index=42)
assert ReadyToRescheduleDep().is_met(ti=ti)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 6a64983edfa..4ef67fb7496 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -27,7 +27,7 @@ import sys
import time
from collections.abc import Callable, Iterable, Iterator, Mapping
from contextlib import suppress
-from datetime import datetime, timezone
+from datetime import datetime, timedelta, timezone
from itertools import product
from pathlib import Path
from typing import TYPE_CHECKING, Annotated, Any, Literal
@@ -41,7 +41,11 @@ from pydantic import AwareDatetime, ConfigDict, Field,
JsonValue, TypeAdapter
from airflow.configuration import conf
from airflow.dag_processing.bundles.base import BaseDagBundle,
BundleVersionLock
from airflow.dag_processing.bundles.manager import DagBundlesManager
-from airflow.exceptions import AirflowInactiveAssetInInletOrOutletException,
AirflowTaskTimeout
+from airflow.exceptions import (
+ AirflowInactiveAssetInInletOrOutletException,
+ AirflowRescheduleException,
+ AirflowTaskTimeout,
+)
from airflow.listeners.listener import get_listener_manager
from airflow.sdk.api.client import get_hostname, getuser
from airflow.sdk.api.datamodels._generated import (
@@ -604,6 +608,33 @@ def _xcom_push_to_db(ti: RuntimeTaskInstance, key: str,
value: Any) -> None:
)
+def _maybe_reschedule_startup_failure(
+ *,
+ ti_context: TIRunContext,
+ log: Logger,
+) -> None:
+ """
+ Attempt to reschedule the task when a startup failure occurs.
+
+ This does not count as a retry. If the reschedule limit is exceeded, this
function
+ returns and the caller should fail the task.
+ """
+ missing_dag_retires = conf.getint("workers", "missing_dag_retires",
fallback=3)
+ missing_dag_retry_delay = conf.getint("workers",
"missing_dag_retry_delay", fallback=60)
+
+ reschedule_count = int(getattr(ti_context, "task_reschedule_count", 0) or
0)
+ if missing_dag_retires > 0 and reschedule_count < missing_dag_retires:
+ raise AirflowRescheduleException(
+ reschedule_date=datetime.now(tz=timezone.utc) +
timedelta(seconds=missing_dag_retry_delay)
+ )
+
+ log.error(
+ "Startup reschedule limit exceeded",
+ reschedule_count=reschedule_count,
+ max_reschedules=missing_dag_retires,
+ )
+
+
def parse(what: StartupDetails, log: Logger) -> RuntimeTaskInstance:
# TODO: Task-SDK:
# Using DagBag here is about 98% wrong, but it'll do for now
@@ -638,6 +669,7 @@ def parse(what: StartupDetails, log: Logger) ->
RuntimeTaskInstance:
log.error(
"Dag not found during start up", dag_id=what.ti.dag_id,
bundle=bundle_info, path=what.dag_rel_path
)
+ _maybe_reschedule_startup_failure(ti_context=what.ti_context, log=log)
sys.exit(1)
# install_loader()
@@ -652,6 +684,7 @@ def parse(what: StartupDetails, log: Logger) ->
RuntimeTaskInstance:
bundle=bundle_info,
path=what.dag_rel_path,
)
+ _maybe_reschedule_startup_failure(ti_context=what.ti_context, log=log)
sys.exit(1)
if not isinstance(task, (BaseOperator, MappedOperator)):
@@ -1547,7 +1580,17 @@ def main():
SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](log=log)
try:
- ti, context, log = startup()
+ try:
+ ti, context, log = startup()
+ except AirflowRescheduleException as reschedule:
+ log.warning("Rescheduling task during startup, marking task as
UP_FOR_RESCHEDULE")
+ SUPERVISOR_COMMS.send(
+ msg=RescheduleTask(
+ reschedule_date=reschedule.reschedule_date,
+ end_date=datetime.now(tz=timezone.utc),
+ )
+ )
+ sys.exit(0)
with BundleVersionLock(
bundle_name=ti.bundle_instance.name,
bundle_version=ti.bundle_instance.version,
@@ -1557,10 +1600,10 @@ def main():
finalize(ti, state, context, log, error)
except KeyboardInterrupt:
log.exception("Ctrl-c hit")
- exit(2)
+ sys.exit(2)
except Exception:
log.exception("Top level error")
- exit(1)
+ sys.exit(1)
finally:
# Ensure the request socket is closed on the child side in all
circumstances
# before the process fully terminates.
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index cb0086195ac..b8f61a2bc50 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -23,7 +23,7 @@ import json
import os
import textwrap
from collections.abc import Iterable
-from datetime import datetime, timedelta
+from datetime import datetime, timedelta, timezone as dt_timezone
from pathlib import Path
from typing import TYPE_CHECKING
from unittest import mock
@@ -37,6 +37,7 @@ from uuid6 import uuid7
from airflow.exceptions import (
AirflowException,
AirflowFailException,
+ AirflowRescheduleException,
AirflowSensorTimeout,
AirflowSkipException,
AirflowTaskTerminated,
@@ -69,6 +70,7 @@ from airflow.sdk.definitions._internal.types import NOTSET,
SET_DURING_EXECUTION
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetUniqueKey,
Dataset, Model
from airflow.sdk.definitions.param import DagParam
from airflow.sdk.exceptions import ErrorType
+from airflow.sdk.execution_time import task_runner
from airflow.sdk.execution_time.comms import (
AssetEventResult,
AssetEventsResult,
@@ -91,6 +93,7 @@ from airflow.sdk.execution_time.comms import (
OKResponse,
PreviousDagRunResult,
PrevSuccessfulDagRunResult,
+ RescheduleTask,
SetRenderedFields,
SetXCom,
SkipDownstreamTasks,
@@ -125,6 +128,7 @@ from airflow.sdk.execution_time.task_runner import (
)
from airflow.sdk.execution_time.xcom import XCom
+from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.mock_operators import AirflowLink
if TYPE_CHECKING:
@@ -236,7 +240,7 @@ def test_parse_not_found(test_dags_dir: Path,
make_ti_context, dag_id, task_id,
),
},
),
- pytest.raises(SystemExit),
+ pytest.raises(AirflowRescheduleException),
):
parse(what, log)
@@ -244,6 +248,85 @@ def test_parse_not_found(test_dags_dir: Path,
make_ti_context, dag_id, task_id,
log.error.assert_has_calls([expected_error])
+def
test_parse_not_found_does_not_reschedule_when_max_attempts_reached(test_dags_dir:
Path, make_ti_context):
+ """
+ If the startup reschedule attempt limit is reached, parsing failures
should not be rescheduled
+ and should surface as a hard failure (SystemExit in the task runner
process).
+ """
+ what = StartupDetails(
+ ti=TaskInstance(
+ id=uuid7(),
+ task_id="a",
+ dag_id="madeup_dag_id",
+ run_id="c",
+ try_number=1,
+ dag_version_id=uuid7(),
+ ),
+ dag_rel_path="super_basic.py",
+ bundle_info=BundleInfo(name="my-bundle", version=None),
+ ti_context=make_ti_context(task_reschedule_count=3),
+ start_date=timezone.utcnow(),
+ )
+
+ log = mock.Mock()
+
+ with (
+ conf_vars(
+ {
+ ("workers", "missing_dag_retires"): "3",
+ ("workers", "missing_dag_retry_delay"): "60",
+ }
+ ),
+ patch.dict(
+ os.environ,
+ {
+ "AIRFLOW__DAG_PROCESSOR__DAG_BUNDLE_CONFIG_LIST": json.dumps(
+ [
+ {
+ "name": "my-bundle",
+ "classpath":
"airflow.dag_processing.bundles.local.LocalDagBundle",
+ "kwargs": {"path": str(test_dags_dir),
"refresh_interval": 1},
+ }
+ ]
+ ),
+ },
+ ),
+ pytest.raises(SystemExit),
+ ):
+ parse(what, log)
+
+
[email protected]("builtins.exit", side_effect=lambda code: (_ for _ in
()).throw(SystemExit(code)))
[email protected]("airflow.sdk.execution_time.task_runner.startup")
[email protected]("airflow.sdk.execution_time.task_runner.CommsDecoder")
+def test_main_sends_reschedule_task_when_startup_reschedules(
+ mock_comms_decoder_cls, mock_startup, mock_exit, time_machine
+):
+ """
+ If startup raises AirflowRescheduleException, the task runner should
report a RescheduleTask
+ message to the supervisor and exit cleanly (code 0).
+ """
+ ts = datetime(2025, 1, 1, tzinfo=dt_timezone.utc)
+ reschedule_date = ts + timedelta(seconds=60)
+
+ mock_comms_instance = mock.Mock()
+ mock_comms_instance.socket = None
+ mock_comms_decoder_cls.__getitem__.return_value.return_value =
mock_comms_instance
+ mock_startup.side_effect =
AirflowRescheduleException(reschedule_date=reschedule_date)
+
+ # Move time
+ time_machine.move_to(ts, tick=False)
+
+ # Run & assert
+ with pytest.raises(SystemExit) as exc:
+ task_runner.main()
+
+ assert exc.value.code == 0
+ assert mock_comms_instance.mock_calls == [
+ call.send(msg=RescheduleTask(reschedule_date=reschedule_date,
end_date=ts))
+ ]
+
+
def test_parse_module_in_bundle_root(tmp_path: Path, make_ti_context):
"""Check that the bundle path is added to sys.path, so Dags can import
shared modules."""
tmp_path.joinpath("util.py").write_text("NAME = 'dag_name'")