This is an automated email from the ASF dual-hosted git repository.

eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5309218ff28 fix: suppress false error logs for partitioned timetables 
when next_dagrun fields are None (#63962)
5309218ff28 is described below

commit 5309218ff2870e9a2eacae6dd997619446d44c90
Author: Pranay Kumar Karvi <[email protected]>
AuthorDate: Sun Mar 22 11:44:45 2026 +0530

    fix: suppress false error logs for partitioned timetables when next_dagrun 
fields are None (#63962)
    
    * fix: suppress false error logs for partitioned timetables when 
next_dagrun fields are None
    
    * fix: restructure partitioned/non-partitioned timetable checks per review 
feedback
    
    * fix: correct test assertions for partitioned timetable partition_key check
    
    * fix: update comment capitalization per review nit
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 34 +++++++++------
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 48 ++++++++++++++++++++++
 2 files changed, 70 insertions(+), 12 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index c25e5e2adff..c64bd166f1b 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1913,18 +1913,28 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     active_runs=active_runs_of_dags.get(dag_model.dag_id),
                 )
                 continue
-            if dag_model.next_dagrun is None and 
dag_model.timetable_partitioned is False:
-                self.log.error(
-                    "dag_model.next_dagrun is None; expected datetime",
-                    dag_id=dag_model.dag_id,
-                )
-                continue
-            if dag_model.next_dagrun_create_after is None:
-                self.log.error(
-                    "dag_model.next_dagrun_create_after is None; expected 
datetime",
-                    dag_id=dag_model.dag_id,
-                )
-                continue
+            if dag_model.timetable_partitioned is False:
+                # non partition-aware Dags
+                if dag_model.next_dagrun is None:
+                    self.log.error(
+                        "dag_model.next_dagrun is None; expected datetime",
+                        dag_id=dag_model.dag_id,
+                    )
+                    continue
+                if dag_model.next_dagrun_create_after is None:
+                    self.log.error(
+                        "dag_model.next_dagrun_create_after is None; expected 
datetime",
+                        dag_id=dag_model.dag_id,
+                    )
+                    continue
+            else:
+                # partition-aware Dags
+                if dag_model.next_dagrun_partition_key is None:
+                    self.log.error(
+                        "dag_model.next_dagrun_partition_key is None; expected 
str",
+                        dag_id=dag_model.dag_id,
+                    )
+                    continue
 
             serdag = self._get_current_dag(dag_id=dag_model.dag_id, 
session=session)
             if not serdag:
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index a6719409900..997ebd24bcf 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -9045,6 +9045,54 @@ def 
test_partitioned_dag_run_with_invalid_mapping(dag_maker: DagMaker, session:
     )
 
 
[email protected]_test
+def 
test_create_dag_runs_partitioned_timetable_skips_when_next_fields_none(session, 
caplog):
+    """
+    Partitioned timetables may leave next_dagrun / next_dagrun_create_after 
unset when no run is due.
+    Scheduler should skip and log if partition key is not set.
+    """
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+    dag_model = MagicMock()
+    dag_model.dag_id = "partitioned-skip-no-next-fields"
+    dag_model.exceeds_max_non_backfill = False
+    dag_model.next_dagrun = None
+    dag_model.timetable_partitioned = True
+    dag_model.next_dagrun_create_after = None
+    dag_model.next_dagrun_partition_key = None
+    dag_model.max_active_runs = 16
+    dag_model.allowed_run_types = None
+
+    with caplog.at_level(logging.ERROR):
+        with mock.patch.object(runner, "_get_current_dag") as mock_get_dag:
+            runner._create_dag_runs([dag_model], session)
+
+    mock_get_dag.assert_not_called()
+    assert "dag_model.next_dagrun_partition_key is None" in caplog.text
+
+
[email protected]_test
+def 
test_create_dag_runs_partitioned_timetable_proceeds_when_partition_key_set(session):
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type), 
executors=[MockExecutor(do_update=False)]
+    )
+    dag_model = MagicMock()
+    dag_model.dag_id = "partitioned-proceed-with-partition-key"
+    dag_model.exceeds_max_non_backfill = False
+    dag_model.next_dagrun = None
+    dag_model.timetable_partitioned = True
+    dag_model.next_dagrun_create_after = None
+    dag_model.next_dagrun_partition_key = "partition-a"
+    dag_model.max_active_runs = 16
+    dag_model.allowed_run_types = None
+
+    with mock.patch.object(runner, "_get_current_dag", return_value=None) as 
mock_get_dag:
+        runner._create_dag_runs([dag_model], session)
+
+    mock_get_dag.assert_called_once()
+
+
 @pytest.mark.need_serialized_dag
 @pytest.mark.usefixtures("clear_asset_partition_rows")
 def test_partitioned_dag_run_with_customized_mapper(

Reply via email to