This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 5309218ff28 fix: suppress false error logs for partitioned timetables
when next_dagrun fields are None (#63962)
5309218ff28 is described below
commit 5309218ff2870e9a2eacae6dd997619446d44c90
Author: Pranay Kumar Karvi <[email protected]>
AuthorDate: Sun Mar 22 11:44:45 2026 +0530
fix: suppress false error logs for partitioned timetables when next_dagrun
fields are None (#63962)
* fix: suppress false error logs for partitioned timetables when
next_dagrun fields are None
* fix: restructure partitioned/non-partitioned timetable checks per review
feedback
* fix: correct test assertions for partitioned timetable partition_key check
* fix: update comment capitalization per review nit
---
.../src/airflow/jobs/scheduler_job_runner.py | 34 +++++++++------
airflow-core/tests/unit/jobs/test_scheduler_job.py | 48 ++++++++++++++++++++++
2 files changed, 70 insertions(+), 12 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index c25e5e2adff..c64bd166f1b 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1913,18 +1913,28 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
active_runs=active_runs_of_dags.get(dag_model.dag_id),
)
continue
- if dag_model.next_dagrun is None and
dag_model.timetable_partitioned is False:
- self.log.error(
- "dag_model.next_dagrun is None; expected datetime",
- dag_id=dag_model.dag_id,
- )
- continue
- if dag_model.next_dagrun_create_after is None:
- self.log.error(
- "dag_model.next_dagrun_create_after is None; expected
datetime",
- dag_id=dag_model.dag_id,
- )
- continue
+ if dag_model.timetable_partitioned is False:
+ # non partition-aware Dags
+ if dag_model.next_dagrun is None:
+ self.log.error(
+ "dag_model.next_dagrun is None; expected datetime",
+ dag_id=dag_model.dag_id,
+ )
+ continue
+ if dag_model.next_dagrun_create_after is None:
+ self.log.error(
+ "dag_model.next_dagrun_create_after is None; expected
datetime",
+ dag_id=dag_model.dag_id,
+ )
+ continue
+ else:
+ # partition-aware Dags
+ if dag_model.next_dagrun_partition_key is None:
+ self.log.error(
+ "dag_model.next_dagrun_partition_key is None; expected
str",
+ dag_id=dag_model.dag_id,
+ )
+ continue
serdag = self._get_current_dag(dag_id=dag_model.dag_id,
session=session)
if not serdag:
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index a6719409900..997ebd24bcf 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -9045,6 +9045,54 @@ def
test_partitioned_dag_run_with_invalid_mapping(dag_maker: DagMaker, session:
)
[email protected]_test
+def
test_create_dag_runs_partitioned_timetable_skips_when_next_fields_none(session,
caplog):
+ """
+ Partitioned timetables may leave next_dagrun / next_dagrun_create_after
unset when no run is due.
+ Scheduler should skip and log if partition key is not set.
+ """
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+ dag_model = MagicMock()
+ dag_model.dag_id = "partitioned-skip-no-next-fields"
+ dag_model.exceeds_max_non_backfill = False
+ dag_model.next_dagrun = None
+ dag_model.timetable_partitioned = True
+ dag_model.next_dagrun_create_after = None
+ dag_model.next_dagrun_partition_key = None
+ dag_model.max_active_runs = 16
+ dag_model.allowed_run_types = None
+
+ with caplog.at_level(logging.ERROR):
+ with mock.patch.object(runner, "_get_current_dag") as mock_get_dag:
+ runner._create_dag_runs([dag_model], session)
+
+ mock_get_dag.assert_not_called()
+ assert "dag_model.next_dagrun_partition_key is None" in caplog.text
+
+
[email protected]_test
+def
test_create_dag_runs_partitioned_timetable_proceeds_when_partition_key_set(session):
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+ dag_model = MagicMock()
+ dag_model.dag_id = "partitioned-proceed-with-partition-key"
+ dag_model.exceeds_max_non_backfill = False
+ dag_model.next_dagrun = None
+ dag_model.timetable_partitioned = True
+ dag_model.next_dagrun_create_after = None
+ dag_model.next_dagrun_partition_key = "partition-a"
+ dag_model.max_active_runs = 16
+ dag_model.allowed_run_types = None
+
+ with mock.patch.object(runner, "_get_current_dag", return_value=None) as
mock_get_dag:
+ runner._create_dag_runs([dag_model], session)
+
+ mock_get_dag.assert_called_once()
+
+
@pytest.mark.need_serialized_dag
@pytest.mark.usefixtures("clear_asset_partition_rows")
def test_partitioned_dag_run_with_customized_mapper(