This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 801f0316a8a fix mypy errors in scheduler_job_runner.py (#58167)
801f0316a8a is described below
commit 801f0316a8a35154dd391a2cd6cd44c2314e75dc
Author: Prafful Javare <[email protected]>
AuthorDate: Tue Nov 11 02:07:21 2025 +0530
fix mypy errors in scheduler_job_runner.py (#58167)
---
.../src/airflow/jobs/scheduler_job_runner.py | 39 ++++++++++++----------
1 file changed, 22 insertions(+), 17 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index fd43c56a6ec..bb81fcbf7a0 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1646,22 +1646,27 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
# instead of falling in a loop of Integrity Error.
if (dag.dag_id, dag_model.next_dagrun) not in existing_dagruns:
try:
- dag.create_dagrun(
- run_id=dag.timetable.generate_run_id(
- run_type=DagRunType.SCHEDULED,
-
run_after=timezone.coerce_datetime(dag_model.next_dagrun),
+ if dag_model.next_dagrun is not None and
dag_model.next_dagrun_create_after is not None:
+ dag.create_dagrun(
+ run_id=dag.timetable.generate_run_id(
+ run_type=DagRunType.SCHEDULED,
+
run_after=timezone.coerce_datetime(dag_model.next_dagrun),
+ data_interval=data_interval,
+ ),
+ logical_date=dag_model.next_dagrun,
data_interval=data_interval,
- ),
- logical_date=dag_model.next_dagrun,
- data_interval=data_interval,
- run_after=dag_model.next_dagrun_create_after,
- run_type=DagRunType.SCHEDULED,
- triggered_by=DagRunTriggeredByType.TIMETABLE,
- state=DagRunState.QUEUED,
- creating_job_id=self.job.id,
- session=session,
- )
- active_runs_of_dags[dag.dag_id] += 1
+ run_after=dag_model.next_dagrun_create_after,
+ run_type=DagRunType.SCHEDULED,
+ triggered_by=DagRunTriggeredByType.TIMETABLE,
+ state=DagRunState.QUEUED,
+ creating_job_id=self.job.id,
+ session=session,
+ )
+ active_runs_of_dags[dag.dag_id] += 1
+ else:
+ if dag_model.next_dagrun is None:
+ raise ValueError("dag_model.next_dagrun is None;
expected datetime")
+ raise ValueError("dag_model.next_dagrun_create_after
is None; expected datetime")
# Exceptions like ValueError, ParamValidationError, etc. are
raised by
# dag.create_dagrun() when dag is misconfigured. The scheduler
should not
# crash due to misconfigured dags. We should log any exception
encountered
@@ -2284,7 +2289,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
@provide_session
def _emit_running_dags_metric(self, session: Session = NEW_SESSION) ->
None:
stmt = select(func.count()).select_from(DagRun).where(DagRun.state ==
DagRunState.RUNNING)
- running_dags = session.scalar(stmt)
+ running_dags = float(session.scalar(stmt))
Stats.gauge("scheduler.dagruns.running", running_dags)
@provide_session
@@ -2547,7 +2552,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
@staticmethod
def _generate_task_instance_heartbeat_timeout_message_details(ti: TI) ->
dict[str, Any]:
- task_instance_heartbeat_timeout_message_details = {
+ task_instance_heartbeat_timeout_message_details: dict[str, Any] = {
"DAG Id": ti.dag_id,
"Task Id": ti.task_id,
"Run Id": ti.run_id,