This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 801f0316a8a fix mypy errors in scheduler_job_runner.py (#58167)
801f0316a8a is described below

commit 801f0316a8a35154dd391a2cd6cd44c2314e75dc
Author: Prafful Javare <[email protected]>
AuthorDate: Tue Nov 11 02:07:21 2025 +0530

    fix mypy errors in scheduler_job_runner.py (#58167)
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 39 ++++++++++++----------
 1 file changed, 22 insertions(+), 17 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index fd43c56a6ec..bb81fcbf7a0 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1646,22 +1646,27 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             # instead of falling in a loop of Integrity Error.
             if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
                 try:
-                    dag.create_dagrun(
-                        run_id=dag.timetable.generate_run_id(
-                            run_type=DagRunType.SCHEDULED,
-                            
run_after=timezone.coerce_datetime(dag_model.next_dagrun),
+                    if dag_model.next_dagrun is not None and 
dag_model.next_dagrun_create_after is not None:
+                        dag.create_dagrun(
+                            run_id=dag.timetable.generate_run_id(
+                                run_type=DagRunType.SCHEDULED,
+                                
run_after=timezone.coerce_datetime(dag_model.next_dagrun),
+                                data_interval=data_interval,
+                            ),
+                            logical_date=dag_model.next_dagrun,
                             data_interval=data_interval,
-                        ),
-                        logical_date=dag_model.next_dagrun,
-                        data_interval=data_interval,
-                        run_after=dag_model.next_dagrun_create_after,
-                        run_type=DagRunType.SCHEDULED,
-                        triggered_by=DagRunTriggeredByType.TIMETABLE,
-                        state=DagRunState.QUEUED,
-                        creating_job_id=self.job.id,
-                        session=session,
-                    )
-                    active_runs_of_dags[dag.dag_id] += 1
+                            run_after=dag_model.next_dagrun_create_after,
+                            run_type=DagRunType.SCHEDULED,
+                            triggered_by=DagRunTriggeredByType.TIMETABLE,
+                            state=DagRunState.QUEUED,
+                            creating_job_id=self.job.id,
+                            session=session,
+                        )
+                        active_runs_of_dags[dag.dag_id] += 1
+                    else:
+                        if dag_model.next_dagrun is None:
+                            raise ValueError("dag_model.next_dagrun is None; 
expected datetime")
+                        raise ValueError("dag_model.next_dagrun_create_after 
is None; expected datetime")
                 # Exceptions like ValueError, ParamValidationError, etc. are 
raised by
                 # dag.create_dagrun() when dag is misconfigured. The scheduler 
should not
                 # crash due to misconfigured dags. We should log any exception 
encountered
@@ -2284,7 +2289,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
     @provide_session
     def _emit_running_dags_metric(self, session: Session = NEW_SESSION) -> 
None:
         stmt = select(func.count()).select_from(DagRun).where(DagRun.state == 
DagRunState.RUNNING)
-        running_dags = session.scalar(stmt)
+        running_dags = float(session.scalar(stmt))
         Stats.gauge("scheduler.dagruns.running", running_dags)
 
     @provide_session
@@ -2547,7 +2552,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
     @staticmethod
     def _generate_task_instance_heartbeat_timeout_message_details(ti: TI) -> 
dict[str, Any]:
-        task_instance_heartbeat_timeout_message_details = {
+        task_instance_heartbeat_timeout_message_details: dict[str, Any] = {
             "DAG Id": ti.dag_id,
             "Task Id": ti.task_id,
             "Run Id": ti.run_id,

Reply via email to