This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f7a7f3c76cb Allow deadline intervals to be configured via Airflow
Variables by supporting (#64751)
f7a7f3c76cb is described below
commit f7a7f3c76cbc9729836bc674ee36f491f711446b
Author: SameerMesiah97 <[email protected]>
AuthorDate: Thu May 28 21:31:35 2026 +0100
Allow deadline intervals to be configured via Airflow Variables by
supporting (#64751)
---
airflow-core/docs/howto/deadline-alerts.rst | 2 +-
airflow-core/docs/migrations-ref.rst | 4 +-
airflow-core/newsfragments/64751.feature.rst | 1 +
.../0116_3_3_0_add_team_name_to_trigger_table.py | 1 +
.../0117_3_3_0_change_deadline_interval_to_json.py | 305 +++++++++++++++++++++
airflow-core/src/airflow/models/deadline_alert.py | 17 +-
airflow-core/src/airflow/serialization/decoders.py | 24 +-
.../src/airflow/serialization/definitions/dag.py | 8 +-
.../airflow/serialization/definitions/deadline.py | 4 +-
airflow-core/src/airflow/serialization/encoders.py | 2 +-
airflow-core/src/airflow/utils/db.py | 2 +-
airflow-core/tests/unit/models/test_dag.py | 5 +-
airflow-core/tests/unit/models/test_dagrun.py | 86 +++++-
.../tests/unit/models/test_serialized_dag.py | 2 +-
.../unit/serialization/test_serialized_objects.py | 16 ++
task-sdk/src/airflow/sdk/definitions/deadline.py | 61 ++++-
.../tests/task_sdk/definitions/test_deadline.py | 52 +++-
17 files changed, 574 insertions(+), 18 deletions(-)
diff --git a/airflow-core/docs/howto/deadline-alerts.rst
b/airflow-core/docs/howto/deadline-alerts.rst
index 64f39c02440..e36908009a0 100644
--- a/airflow-core/docs/howto/deadline-alerts.rst
+++ b/airflow-core/docs/howto/deadline-alerts.rst
@@ -42,7 +42,7 @@ Creating a Deadline Alert
Creating a Deadline Alert requires three mandatory parameters:
* Reference: When to start counting from
-* Interval: How far before or after the reference point to trigger the alert
+* Interval: How far before or after the reference point to trigger the alert
(either a timedelta or a dynamic interval such as VariableInterval)
* Callback: A Callback object which contains a path to a callable and optional
kwargs to pass to it if the deadline is exceeded
Here is how Deadlines are calculated:
diff --git a/airflow-core/docs/migrations-ref.rst
b/airflow-core/docs/migrations-ref.rst
index 82f32c8a2fd..32fbc76feb0 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description
|
+=========================+==================+===================+==============================================================+
-| ``acc215baed80`` (head) | ``a1b2c3d4e5f6`` | ``3.3.0`` | Add
team_name to trigger table. |
+| ``8812eb67b63c`` (head) | ``acc215baed80`` | ``3.3.0`` | Change
Deadline interval to JSON. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``acc215baed80`` | ``a1b2c3d4e5f6`` | ``3.3.0`` | Add
team_name to trigger table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``a1b2c3d4e5f6`` | ``a7f3b2c1d4e5`` | ``3.3.0`` | Add
version_data to dag_version. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git a/airflow-core/newsfragments/64751.feature.rst
b/airflow-core/newsfragments/64751.feature.rst
new file mode 100644
index 00000000000..41d647f143d
--- /dev/null
+++ b/airflow-core/newsfragments/64751.feature.rst
@@ -0,0 +1 @@
+Allow DeadlineAlert intervals to be dynamically resolved at Deadline
evaluation using objects such as VariableInterval.
diff --git
a/airflow-core/src/airflow/migrations/versions/0116_3_3_0_add_team_name_to_trigger_table.py
b/airflow-core/src/airflow/migrations/versions/0116_3_3_0_add_team_name_to_trigger_table.py
index 503f1b58d7c..e217cbeb45c 100644
---
a/airflow-core/src/airflow/migrations/versions/0116_3_3_0_add_team_name_to_trigger_table.py
+++
b/airflow-core/src/airflow/migrations/versions/0116_3_3_0_add_team_name_to_trigger_table.py
@@ -22,6 +22,7 @@ Add team_name to trigger table.
Revision ID: acc215baed80
Revises: a1b2c3d4e5f6
Create Date: 2026-05-21 21:38:00.122692
+
"""
from __future__ import annotations
diff --git
a/airflow-core/src/airflow/migrations/versions/0117_3_3_0_change_deadline_interval_to_json.py
b/airflow-core/src/airflow/migrations/versions/0117_3_3_0_change_deadline_interval_to_json.py
new file mode 100644
index 00000000000..04e5a35aa31
--- /dev/null
+++
b/airflow-core/src/airflow/migrations/versions/0117_3_3_0_change_deadline_interval_to_json.py
@@ -0,0 +1,305 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Change Deadline interval to JSON.
+
+Revision ID: 8812eb67b63c
+Revises: acc215baed80
+Create Date: 2026-05-28 17:36:56.837243
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import context, op
+
+# revision identifiers, used by Alembic.
+revision = "8812eb67b63c"
+down_revision = "acc215baed80"
+branch_labels = None
+depends_on = None
+airflow_version = "3.3.0"
+
+
+def upgrade():
+ """Apply change deadline interval to JSON."""
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if context.is_offline_mode():
+ print(
+ """
+ Manual conversion required:
+
+ PostgreSQL:
+
+ Step 1: Convert column type.
+ ALTER TABLE deadline_alert
+ ALTER COLUMN interval TYPE JSONB
+ USING to_json(interval);
+
+ Step 2: Convert values.
+ UPDATE deadline_alert
+ SET interval = json_build_object(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', (interval::text)::float
+ )
+ WHERE jsonb_typeof(interval::jsonb) = 'number';
+
+ MySQL:
+
+ Step 1: Convert column type.
+ ALTER TABLE deadline_alert MODIFY COLUMN `interval` JSON;
+
+ Step 2: Convert values
+ UPDATE deadline_alert
+ SET `interval` = JSON_OBJECT(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', `interval`
+ );
+
+ SQLite:
+
+ UPDATE deadline_alert
+ SET interval =
+
'{"__classname__":"datetime.timedelta","__version__":2,"__data__":'
+ || CAST(interval AS TEXT) || '}';
+ """
+ )
+ return
+
+ with op.batch_alter_table("deadline_alert") as batch_op:
+ if dialect == "postgresql":
+ batch_op.alter_column(
+ "interval",
+ existing_type=sa.FLOAT(),
+ type_=sa.JSON(),
+ postgresql_using="to_json(interval)",
+ existing_nullable=False,
+ )
+ else:
+ batch_op.alter_column(
+ "interval",
+ existing_type=sa.FLOAT(),
+ type_=sa.JSON(),
+ existing_nullable=False,
+ )
+
+ if dialect == "postgresql":
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval = json_build_object(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', (interval::text)::float
+ )
+ WHERE jsonb_typeof(interval::jsonb) = 'number'
+ """)
+
+ elif dialect == "mysql":
+ op.execute("""
+ UPDATE deadline_alert
+ SET `interval` = JSON_OBJECT(
+ '__classname__', 'datetime.timedelta',
+ '__version__', 2,
+ '__data__', `interval`
+ )
+ """)
+
+ else:
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval =
+ '{"__classname__":"datetime.timedelta","__version__":'
+ || '2' ||
+ ',"__data__":' || CAST(interval AS TEXT) || '}'
+ """)
+
+
+def downgrade():
+ """Revert deadline interval back to float."""
+ conn = op.get_bind()
+ dialect = conn.dialect.name
+
+ if context.is_offline_mode():
+ print(
+ """
+ Manual downgrade required:
+
+ PostgreSQL:
+
+ Step 1: Convert values.
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN jsonb_typeof(interval::jsonb) = 'number'
+ THEN interval
+ WHEN (interval::jsonb)->>'__classname__' =
'datetime.timedelta'
+ THEN to_json((interval->>'__data__')::double precision)
+ ELSE NULL
+ END;
+
+ Step 2: Convert column type.
+ ALTER TABLE deadline_alert
+ ALTER COLUMN interval TYPE DOUBLE PRECISION
+ USING (
+ CASE
+ WHEN jsonb_typeof(interval::jsonb) = 'number'
+ THEN interval::text::double precision
+ WHEN (interval::jsonb)->>'__classname__' =
'datetime.timedelta'
+ THEN (interval->>'__data__')::double precision
+ ELSE NULL
+ END
+ );
+
+ MySQL:
+
+ Step 1: Convert values
+ UPDATE deadline_alert
+ SET `interval` =
+ CASE
+ WHEN JSON_EXTRACT(`interval`, '$.__data__') IS NOT NULL
+ THEN CAST(JSON_EXTRACT(`interval`, '$.__data__') AS
DOUBLE)
+ WHEN JSON_EXTRACT(`interval`, '$.__classname__') IS NULL
+ THEN CAST(`interval` AS DOUBLE)
+ ELSE NULL
+ END;
+
+ Step 2: Convert column type
+ ALTER TABLE deadline_alert
+ MODIFY COLUMN `interval` DOUBLE;
+
+ SQLite:
+
+ Step 1: Convert values
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN json_extract(interval, '$.__data__') IS NOT NULL
+ THEN CAST(json_extract(interval, '$.__data__') AS REAL)
+ WHEN json_extract(interval, '$.__classname__') IS NULL
+ THEN CAST(interval AS REAL)
+ ELSE NULL
+ END;
+
+ Step 2: SQLite does not support ALTER COLUMN TYPE.
+ Recreate the table with interval as REAL and copy data.
+ """
+ )
+ return
+
+ if dialect == "postgresql":
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN jsonb_typeof(interval::jsonb) = 'number'
+ THEN interval
+ WHEN (interval::jsonb)->>'__classname__' =
'datetime.timedelta'
+ THEN to_json((interval->>'__data__')::double precision)
+ ELSE NULL
+ END
+ """)
+
+ elif dialect == "mysql":
+ op.execute("""
+ UPDATE deadline_alert
+ SET `interval` =
+ CASE
+ WHEN JSON_EXTRACT(`interval`, '$.__data__') IS NOT NULL
+ THEN CAST(JSON_EXTRACT(`interval`, '$.__data__') AS DOUBLE)
+ WHEN JSON_EXTRACT(`interval`, '$.__classname__') IS NULL
+ THEN CAST(`interval` AS DOUBLE)
+ ELSE NULL
+ END
+ """)
+
+ # Serialized VariableInterval objects do not contain a numeric "__data__"
field
+ # and therefore cannot be converted back to a float representation.
+ # During downgrade, only timedelta-style serialized values are converted.
+ # Other serialized interval types (e.g. VariableInterval) will cast as
null.
+ else:
+ # Detect availability of SQLite JSON functions (JSON1 extension).
+ json_functions_available = False
+ try:
+ conn.execute(sa.text("SELECT json_extract('{\"a\":1}',
'$.a')")).fetchone()
+ json_functions_available = True
+ except Exception:
+ print("SQLite JSON functions not available, using string parsing
as fallback.")
+
+ if json_functions_available:
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN json_extract(interval, '$.__data__') IS NOT NULL
+ THEN CAST(json_extract(interval, '$.__data__') AS REAL)
+ WHEN json_extract(interval, '$.__classname__') IS NULL
+ THEN CAST(interval AS REAL)
+ ELSE NULL
+ END
+ """)
+ else:
+ # NOTE: This is a best-effort fallback for environments without
JSON1.
+ # It assumes a stable JSON format and may not work for all
serialized values.
+ op.execute("""
+ UPDATE deadline_alert
+ SET interval =
+ CASE
+ WHEN instr(interval, '__data__') > 0
+ THEN CAST(
+ substr(
+ interval,
+ instr(interval, '__data__') +
+ instr(substr(interval, instr(interval,
'__data__')), ':')
+ ) AS FLOAT
+ )
+ WHEN instr(interval, '__classname__') = 0
+ THEN CAST(interval AS FLOAT)
+ ELSE NULL
+ END
+ """)
+
+ with op.batch_alter_table("deadline_alert") as batch_op:
+ if dialect == "postgresql":
+ batch_op.alter_column(
+ "interval",
+ existing_type=sa.JSON(),
+ type_=sa.FLOAT(),
+ postgresql_using="""
+ CASE
+ WHEN jsonb_typeof(interval::jsonb) = 'number'
+ THEN interval::text::double precision
+ WHEN (interval::jsonb)->>'__classname__' =
'datetime.timedelta'
+ THEN (interval->>'__data__')::double precision
+ ELSE NULL
+ END
+ """,
+ existing_nullable=False,
+ )
+ else:
+ batch_op.alter_column(
+ "interval",
+ existing_type=sa.JSON(),
+ type_=sa.FLOAT(),
+ existing_nullable=False,
+ )
diff --git a/airflow-core/src/airflow/models/deadline_alert.py
b/airflow-core/src/airflow/models/deadline_alert.py
index 0b8a8eba9b1..d9b6590c0f7 100644
--- a/airflow-core/src/airflow/models/deadline_alert.py
+++ b/airflow-core/src/airflow/models/deadline_alert.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING
from uuid import UUID
import uuid6
-from sqlalchemy import JSON, Float, ForeignKey, String, Text, Uuid, select
+from sqlalchemy import JSON, ForeignKey, String, Text, Uuid, select
from sqlalchemy.exc import NoResultFound
from sqlalchemy.orm import Mapped, mapped_column
@@ -50,13 +50,22 @@ class DeadlineAlert(Base):
name: Mapped[str | None] = mapped_column(String(250), nullable=True)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
reference: Mapped[dict] = mapped_column(JSON, nullable=False)
- interval: Mapped[float] = mapped_column(Float, nullable=False)
+ interval: Mapped[dict] = mapped_column(JSON, nullable=False)
callback_def: Mapped[dict] = mapped_column(JSON, nullable=False)
def __repr__(self):
- interval_seconds = int(self.interval)
- if interval_seconds >= 3600:
+ interval_seconds = None
+
+ if isinstance(self.interval, (int, float)):
+ interval_seconds = int(self.interval)
+
+ elif isinstance(self.interval, datetime.timedelta):
+ interval_seconds = int(self.interval.total_seconds())
+
+ if interval_seconds is None:
+ interval_display = "dynamic"
+ elif interval_seconds >= 3600:
interval_display = f"{interval_seconds // 3600}h"
elif interval_seconds >= 60:
interval_display = f"{interval_seconds // 60}m"
diff --git a/airflow-core/src/airflow/serialization/decoders.py
b/airflow-core/src/airflow/serialization/decoders.py
index 22683ef5d61..b36b7a8a524 100644
--- a/airflow-core/src/airflow/serialization/decoders.py
+++ b/airflow-core/src/airflow/serialization/decoders.py
@@ -156,6 +156,7 @@ def decode_deadline_alert(encoded_data: dict):
:meta private:
"""
+ from airflow.sdk.definitions.deadline import VariableInterval
from airflow.sdk.serde import deserialize
data = encoded_data.get(Encoding.VAR, encoded_data)
@@ -163,9 +164,30 @@ def decode_deadline_alert(encoded_data: dict):
reference_data = data[DeadlineAlertFields.REFERENCE]
reference = decode_deadline_reference(reference_data)
+ raw_interval = data[DeadlineAlertFields.INTERVAL]
+
+ if raw_interval is None:
+ raise ValueError(
+ "DeadlineAlert interval is missing. This can happen after
downgrading "
+ "from a version that supports VariableInterval. Downgrade is not
fully reversible."
+ )
+
+ interval: datetime.timedelta | VariableInterval
+
+ # Backward compatibility: previously interval was stored as
total_seconds() (float/int).
+ # Handle numeric values by converting to timedelta.
+ if isinstance(raw_interval, (int, float)):
+ interval = datetime.timedelta(seconds=raw_interval)
+ else:
+ deserialized = deserialize(raw_interval)
+ if isinstance(deserialized, (datetime.timedelta, VariableInterval)):
+ interval = deserialized
+ else:
+ raise TypeError(f"Invalid interval type:
{type(deserialized).__name__}")
+
return SerializedDeadlineAlert(
reference=reference,
-
interval=datetime.timedelta(seconds=data[DeadlineAlertFields.INTERVAL]),
+ interval=interval,
callback=deserialize(data[DeadlineAlertFields.CALLBACK]),
name=data.get(DeadlineAlertFields.NAME),
)
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index d20b9c22c03..6fb7bf083cf 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -41,6 +41,7 @@ from airflow.models.deadline import Deadline
from airflow.models.deadline_alert import DeadlineAlert as DeadlineAlertModel
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.tasklog import LogTemplate
+from airflow.sdk.definitions.deadline import VariableInterval
from airflow.serialization.decoders import decode_deadline_alert
from airflow.serialization.definitions.deadline import DeadlineAlertFields,
SerializedReferenceModels
from airflow.serialization.definitions.param import SerializedParamsDict
@@ -653,10 +654,15 @@ class SerializedDAG:
}
)
+ interval = deserialized_deadline_alert.interval
+
+ if isinstance(interval, VariableInterval):
+ interval = interval.resolve()
+
if isinstance(deserialized_deadline_alert.reference,
SerializedReferenceModels.TYPES.DAGRUN):
deadline_time =
deserialized_deadline_alert.reference.evaluate_with(
session=session,
- interval=deserialized_deadline_alert.interval,
+ interval=interval,
# TODO : Pretty sure we can drop these last two; verify
after testing is complete
dag_id=self.dag_id,
run_id=orm_dagrun.run_id,
diff --git a/airflow-core/src/airflow/serialization/definitions/deadline.py
b/airflow-core/src/airflow/serialization/definitions/deadline.py
index 58eaa46e6f7..89e231cba24 100644
--- a/airflow-core/src/airflow/serialization/definitions/deadline.py
+++ b/airflow-core/src/airflow/serialization/definitions/deadline.py
@@ -38,6 +38,8 @@ if TYPE_CHECKING:
from sqlalchemy import ColumnElement
from sqlalchemy.orm import Session
+ from airflow.sdk.definitions.deadline import VariableInterval
+
logger = logging.getLogger(__name__)
@@ -366,6 +368,6 @@ class SerializedDeadlineAlert:
"""Serialized representation of a deadline alert."""
reference: SerializedReferenceModels.SerializedBaseDeadlineReference
- interval: timedelta
+ interval: timedelta | VariableInterval
callback: Any
name: str | None = None
diff --git a/airflow-core/src/airflow/serialization/encoders.py
b/airflow-core/src/airflow/serialization/encoders.py
index e97dcff2623..b9caea4cc37 100644
--- a/airflow-core/src/airflow/serialization/encoders.py
+++ b/airflow-core/src/airflow/serialization/encoders.py
@@ -226,7 +226,7 @@ def encode_deadline_alert(d: DeadlineAlert |
SerializedDeadlineAlert) -> dict[st
return {
"name": d.name,
"reference": encode_deadline_reference(d.reference),
- "interval": d.interval.total_seconds(),
+ "interval": serialize(d.interval),
"callback": serialize(d.callback),
}
diff --git a/airflow-core/src/airflow/utils/db.py
b/airflow-core/src/airflow/utils/db.py
index 00d512909dc..0d3bf5ef1aa 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -116,7 +116,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
"3.1.0": "cc92b33c6709",
"3.1.8": "509b94a1042d",
"3.2.0": "1d6611b6ab7c",
- "3.3.0": "acc215baed80",
+ "3.3.0": "8812eb67b63c",
}
# Prefix used to identify tables holding data moved during migration.
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index f9751684e35..0a15c9ab09d 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -2229,7 +2229,10 @@ my_postgres_conn:
).all()
assert len(stored_alerts) == expected_num_deadlines
- intervals = sorted([alert.interval for alert in stored_alerts])
+ intervals = sorted(
+ alert.interval["__data__"] if isinstance(alert.interval, dict)
else alert.interval
+ for alert in stored_alerts
+ )
assert intervals == [300.0, 600.0, 3600.0]
# Now create a dagrun and verify deadlines are created
diff --git a/airflow-core/tests/unit/models/test_dagrun.py
b/airflow-core/tests/unit/models/test_dagrun.py
index 5f555ba69d0..7ac2514cbf7 100644
--- a/airflow-core/tests/unit/models/test_dagrun.py
+++ b/airflow-core/tests/unit/models/test_dagrun.py
@@ -61,7 +61,9 @@ from airflow.providers.standard.operators.empty import
EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator,
ShortCircuitOperator
from airflow.sdk import DAG, BaseOperator, get_current_context, setup, task,
task_group, teardown
from airflow.sdk.definitions.callback import AsyncCallback
-from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference
+from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference,
VariableInterval
+from airflow.sdk.definitions.variable import Variable
+from airflow.sdk.exceptions import AirflowRuntimeError
from airflow.serialization.definitions.deadline import
SerializedReferenceModels
from airflow.serialization.serialized_objects import LazyDeserializedDAG
from airflow.settings import get_policy_plugin_manager
@@ -1326,17 +1328,28 @@ class TestDagRun:
assert isinstance(dag_run.dag_versions, list)
assert len(dag_run.dag_versions) == 0
+ @pytest.mark.parametrize(
+ "interval",
+ [
+ datetime.timedelta(hours=1),
+ VariableInterval("my_key"),
+ ],
+ )
+ @mock.patch.object(Variable, "get")
@mock.patch.object(Deadline, "prune_deadlines")
- def test_dagrun_success_deadline(self, _, session, deadline_test_dag):
+ def test_dagrun_success_deadline(self, _, mock_get, interval, session,
deadline_test_dag):
def on_success_callable(context):
assert context["dag_run"].dag_id == "test_dag"
future_date = datetime.datetime.now() + datetime.timedelta(days=365)
+ # First value used during resolution
+ mock_get.return_value = "5"
+
scheduler_dag = deadline_test_dag(
deadline=DeadlineAlert(
reference=DeadlineReference.FIXED_DATETIME(future_date),
- interval=datetime.timedelta(hours=1),
+ interval=interval,
callback=AsyncCallback(empty_callback_for_deadline),
),
on_success_callback=on_success_callable,
@@ -1441,6 +1454,73 @@ class TestDagRun:
mock_prune.assert_not_called()
assert dag_run.state == DagRunState.SUCCESS
+ @mock.patch.object(Variable, "get")
+ @mock.patch.object(Deadline, "prune_deadlines")
+ def test_dagrun_deadline_variable_interval_stable(self, _, mock_get,
session, deadline_test_dag):
+ future_date = datetime.datetime.now() + datetime.timedelta(days=365)
+
+ # First value used during resolution.
+ mock_get.return_value = "60"
+
+ scheduler_dag = deadline_test_dag(
+ deadline=DeadlineAlert(
+ reference=DeadlineReference.FIXED_DATETIME(future_date),
+ interval=VariableInterval("my_key"),
+ callback=AsyncCallback(empty_callback_for_deadline),
+ ),
+ )
+
+ dag_run = self.create_dag_run(
+ dag=scheduler_dag,
+ task_states={"task_1": TaskInstanceState.SUCCESS, "task_2":
TaskInstanceState.SUCCESS},
+ session=session,
+ )
+ dag_run.dag = scheduler_dag
+
+ # First update resolve interval to "5".
+ dag_run.update_state(session=session)
+
+ deadline = session.execute(select(Deadline)).scalars().one_or_none()
+ first_deadline_time = deadline.deadline_time
+
+ # Change Variable value after resolution.
+ mock_get.return_value = "120"
+
+ # Run again (This should not change existing deadline).
+ dag_run.update_state(session=session)
+
+ deadline = session.execute(select(Deadline)).scalars().one_or_none()
+ assert deadline.deadline_time == first_deadline_time
+
+ @mock.patch.object(Deadline, "prune_deadlines")
+ def test_dagrun_deadline_variable_interval_missing_variable_fails(self, _,
session, deadline_test_dag):
+
+ mock_err = mock.Mock()
+ mock_err.error.value = "MISSING_DEADLINE"
+ mock_err.detail = "missing deadline"
+
+ with mock.patch.object(
+ Variable,
+ "get",
+ side_effect=AirflowRuntimeError(mock_err),
+ ):
+ future_date = datetime.datetime.now() +
datetime.timedelta(days=365)
+
+ scheduler_dag = deadline_test_dag(
+ deadline=DeadlineAlert(
+ reference=DeadlineReference.FIXED_DATETIME(future_date),
+ interval=VariableInterval("missing_key"),
+ callback=AsyncCallback(empty_callback_for_deadline),
+ ),
+ )
+
+ with pytest.raises(ValueError, match="not found"):
+ self.create_dag_run(
+ dag=scheduler_dag,
+ task_states={"task_1": TaskInstanceState.SUCCESS},
+ session=session,
+ )
+
@pytest.mark.parametrize(
("run_type", "expected_tis"),
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index 54438f8e82f..765b2adf206 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -849,7 +849,7 @@ class TestSerializedDagModel:
# There should be a second serdag with a new hash and the new interval.
assert new_serdag_count == 2
assert new_serdag.dag_hash != orig_serdag.dag_hash
- assert new_alert.interval == 600.0
+ assert new_alert.interval["__data__"] == 600.0
def test_deadline_name_change_updates_db_and_returns_true(self,
testing_dag_bundle, session):
"""Name-only deadline change: UUID reused, DB row updated, write_dag
returns True."""
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 1767209ca73..6d558628e74 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -513,6 +513,22 @@ def test_serialize_deserialize_deadline_alert(reference):
assert deserialized.callback == original.callback
+def test_deserialize_deadline_alert_none_interval_raises():
+ valid = DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_QUEUED_AT,
+ interval=timedelta(hours=1),
+ callback=AsyncCallback(TEST_CALLBACK_PATH,
kwargs=TEST_CALLBACK_KWARGS),
+ )
+
+ serialized = BaseSerialization.serialize(valid)
+
+ # Inject downgrade corruption.
+ serialized[Encoding.VAR][DeadlineAlertFields.INTERVAL] = None
+
+ with pytest.raises(ValueError, match="interval"):
+ BaseSerialization.deserialize(serialized)
+
+
@pytest.mark.parametrize(
"conn_uri",
[
diff --git a/task-sdk/src/airflow/sdk/definitions/deadline.py
b/task-sdk/src/airflow/sdk/definitions/deadline.py
index 2ab20d056b9..a9da3dd3d3e 100644
--- a/task-sdk/src/airflow/sdk/definitions/deadline.py
+++ b/task-sdk/src/airflow/sdk/definitions/deadline.py
@@ -22,7 +22,11 @@ from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
+import attrs
+
from airflow.sdk.definitions.callback import AsyncCallback, Callback,
SyncCallback
+from airflow.sdk.definitions.variable import Variable
+from airflow.sdk.exceptions import AirflowRuntimeError
if TYPE_CHECKING:
from collections.abc import Callable
@@ -143,7 +147,7 @@ class DeadlineAlert:
def __init__(
self,
reference: DeadlineReferenceType,
- interval: timedelta,
+ interval: timedelta | VariableInterval,
callback: Callback,
name: str | None = None,
):
@@ -342,3 +346,58 @@ def deadline_reference(
return reference_class
return decorator
+
+
[email protected](frozen=True)
+class VariableInterval:
+ """
+ Interval backed by an Airflow Variable.
+
+ This allows DeadlineAlert intervals to be configured dynamically using
+ Airflow Variables. The variable value is interpreted as seconds and
+ converted into a ``timedelta`` object.
+
+ ------
+ Usage:
+ ------
+
+ .. code-block:: python
+
+ from airflow.sdk import DAG, DeadlineAlert, DeadlineReference,
AsyncCallback
+
+ DAG(
+ dag_id="dag_with_variable_interval",
+ deadline=DeadlineAlert(
+ reference=DeadlineReference.DAGRUN_QUEUED_AT,
+ interval=VariableInterval("deadline_seconds"),
+ callback=AsyncCallback(my_callback),
+ ),
+ )
+
+ ------
+ Notes:
+ ------
+ * Resolution occurs when deadlines are evaluated (during DagRun creation).
+ * Changes to the Variable affect only newly parsed DAGs and future DagRuns.
+ * Existing deadlines are not retroactively updated.
+ """
+
+ key: str
+
+ def resolve(self) -> timedelta:
+ try:
+ value = Variable.get(self.key)
+ except AirflowRuntimeError as e:
+ raise ValueError(f"VariableInterval '{self.key}' not found") from e
+
+ try:
+ seconds = int(value)
+ except (TypeError, ValueError) as e:
+ raise ValueError(
+ f"VariableInterval '{self.key}' must be an integer (seconds),
got: {value!r}"
+ ) from e
+
+ if seconds <= 0:
+ raise ValueError(f"VariableInterval '{self.key}' must be > 0, got:
{seconds}")
+
+ return timedelta(seconds=seconds)
diff --git a/task-sdk/tests/task_sdk/definitions/test_deadline.py
b/task-sdk/tests/task_sdk/definitions/test_deadline.py
index 8e9e816b307..b104980e4c9 100644
--- a/task-sdk/tests/task_sdk/definitions/test_deadline.py
+++ b/task-sdk/tests/task_sdk/definitions/test_deadline.py
@@ -17,12 +17,15 @@
from __future__ import annotations
from datetime import datetime, timedelta
+from unittest import mock
import pytest
from task_sdk.definitions.test_callback import TEST_CALLBACK_KWARGS,
TEST_CALLBACK_PATH, UNIMPORTABLE_DOT_PATH
from airflow.sdk.definitions.callback import AsyncCallback, SyncCallback
-from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference
+from airflow.sdk.definitions.deadline import DeadlineAlert, DeadlineReference,
VariableInterval
+from airflow.sdk.definitions.variable import Variable
+from airflow.sdk.exceptions import AirflowRuntimeError
DAG_ID = "dag_id_1"
RUN_ID = 1
@@ -162,3 +165,50 @@ class TestDeadlineAlert:
interval=timedelta(hours=1),
callback="not_a_callback", # type: ignore
)
+
+
+class TestVariableInterval:
+ @pytest.mark.parametrize(
+ ("value", "expected"),
+ [
+ ("3", timedelta(seconds=3)),
+ ("10", timedelta(seconds=10)),
+ ("05", timedelta(seconds=5)), # leading zero
+ ],
+ )
+ def test_resolve_valid(self, mocker, value, expected):
+ mocker.patch.object(Variable, "get", return_value=value)
+
+ interval = VariableInterval(key="test_interval")
+
+ assert interval.resolve() == expected
+
+ @pytest.mark.parametrize(
+ ("value", "raise_runtime", "match"),
+ [
+ (None, True, "not found"),
+ ("abc", False, "must be an integer"),
+ ("", False, "must be an integer"),
+ ("0", False, "must be > 0"),
+ ("-5", False, "must be > 0"),
+ ],
+ )
+ def test_resolve_invalid(self, mocker, value, raise_runtime, match):
+
+ if raise_runtime:
+ mock_err = mock.Mock()
+ mock_err.error.value = "MISSING"
+ mock_err.detail = "missing"
+
+ mocker.patch.object(
+ Variable,
+ "get",
+ side_effect=AirflowRuntimeError(mock_err),
+ )
+ else:
+ mocker.patch.object(Variable, "get", return_value=value)
+
+ interval = VariableInterval(key="test_interval")
+
+ with pytest.raises(ValueError, match=match):
+ interval.resolve()