This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9432a3f463 Better typing for Job and JobRunners (#31240)
9432a3f463 is described below
commit 9432a3f4633e1cc75ad853fe6ab9398f1b6856e4
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon May 15 10:39:19 2023 +0200
Better typing for Job and JobRunners (#31240)
By avoiding setting the job in the BaseJobRunner, the typing for Runners
and Job and JobPydantic is now more complete and accurate.
Scheduler and Backfill Runners limit their code to Job and can use all
the things that ORM Job allows them to do
Other runners are limited to union of Job and JobPydantic version so
that they can be run on the client side of the internal API without
having all the Job features.
This is a follow up after #31182 that fixed missing job_type for
DagProcessor Job and nicely extracted job to BaseRunner but broke
MyPy/Typing guards implemented in the runners that should aid the AIP-44
implementation.
---
airflow/jobs/backfill_job_runner.py | 1 +
airflow/jobs/base_job_runner.py | 6 +++---
airflow/jobs/dag_processor_job_runner.py | 1 +
airflow/jobs/local_task_job_runner.py | 1 +
airflow/jobs/scheduler_job_runner.py | 1 +
airflow/jobs/triggerer_job_runner.py | 1 +
6 files changed, 8 insertions(+), 3 deletions(-)
diff --git a/airflow/jobs/backfill_job_runner.py
b/airflow/jobs/backfill_job_runner.py
index 65dfc0b54e..ebc92187f9 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -149,6 +149,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
:param kwargs:
"""
super().__init__(job)
+ self.job = job
self.dag = dag
self.dag_id = dag.dag_id
self.bf_start_date = start_date
diff --git a/airflow/jobs/base_job_runner.py b/airflow/jobs/base_job_runner.py
index eeaff887df..5b0af0dbec 100644
--- a/airflow/jobs/base_job_runner.py
+++ b/airflow/jobs/base_job_runner.py
@@ -25,6 +25,7 @@ if TYPE_CHECKING:
from sqlalchemy.orm import Session
from airflow.jobs.job import Job
+ from airflow.serialization.pydantic.job import JobPydantic
class BaseJobRunner:
@@ -32,14 +33,13 @@ class BaseJobRunner:
job_type = "undefined"
- def __init__(self, job):
+ def __init__(self, job: Job | JobPydantic) -> None:
if job.job_type and job.job_type != self.job_type:
raise Exception(
f"The job is already assigned a different job_type:
{job.job_type}."
f"This is a bug and should be reported."
)
- self.job = job
- self.job.job_type = self.job_type
+ job.job_type = self.job_type
def _execute(self) -> int | None:
"""
diff --git a/airflow/jobs/dag_processor_job_runner.py
b/airflow/jobs/dag_processor_job_runner.py
index c8e114cb0b..b2420726c6 100644
--- a/airflow/jobs/dag_processor_job_runner.py
+++ b/airflow/jobs/dag_processor_job_runner.py
@@ -47,6 +47,7 @@ class DagProcessorJobRunner(BaseJobRunner, LoggingMixin):
**kwargs,
):
super().__init__(job)
+ self.job = job
self.processor = processor
self.processor.heartbeat = lambda: perform_heartbeat(
job=self.job,
diff --git a/airflow/jobs/local_task_job_runner.py
b/airflow/jobs/local_task_job_runner.py
index da821f2288..2fb4eaaba3 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -88,6 +88,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
external_executor_id: str | None = None,
):
super().__init__(job)
+ self.job = job
LoggingMixin.__init__(self, context=task_instance)
self.task_instance = task_instance
self.ignore_all_deps = ignore_all_deps
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 1c1c2eddcf..080b8bf372 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -157,6 +157,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
processor_poll_interval: float | None = None,
):
super().__init__(job)
+ self.job = job
self.subdir = subdir
self.num_runs = num_runs
# In specific tests, we want to stop the parse loop after the _files_
have been parsed a certain
diff --git a/airflow/jobs/triggerer_job_runner.py
b/airflow/jobs/triggerer_job_runner.py
index d26b52f637..4651b6cf92 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -253,6 +253,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
capacity=None,
):
super().__init__(job)
+ self.job = job
if capacity is None:
self.capacity = conf.getint("triggerer", "default_capacity",
fallback=1000)
elif isinstance(capacity, int) and capacity > 0: