This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9432a3f463 Better typing for Job and JobRunners (#31240)
9432a3f463 is described below

commit 9432a3f4633e1cc75ad853fe6ab9398f1b6856e4
Author: Jarek Potiuk <[email protected]>
AuthorDate: Mon May 15 10:39:19 2023 +0200

    Better typing for Job and JobRunners (#31240)
    
    By avoiding setting the job in the BaseJobRunner, the typing for Runners
    and Job and JobPydantic is now more complete and accurate.
    
    Scheduler and Backfill Runners limit their code to Job and can use all
    the things that ORM Job allows them to do
    
    Other runners are limited to union of Job and JobPydantic version so
    that they can be run on the client side of the internal API without
    having all the Job features.
    
    This is a follow up after #31182 that fixed missing job_type for
    DagProcessor Job and nicely extracted job to BaseRunner but broke
    MyPy/Typing guards implemented in the runners that should aid the AIP-44
    implementation.
---
 airflow/jobs/backfill_job_runner.py      | 1 +
 airflow/jobs/base_job_runner.py          | 6 +++---
 airflow/jobs/dag_processor_job_runner.py | 1 +
 airflow/jobs/local_task_job_runner.py    | 1 +
 airflow/jobs/scheduler_job_runner.py     | 1 +
 airflow/jobs/triggerer_job_runner.py     | 1 +
 6 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/airflow/jobs/backfill_job_runner.py 
b/airflow/jobs/backfill_job_runner.py
index 65dfc0b54e..ebc92187f9 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -149,6 +149,7 @@ class BackfillJobRunner(BaseJobRunner, LoggingMixin):
         :param kwargs:
         """
         super().__init__(job)
+        self.job = job
         self.dag = dag
         self.dag_id = dag.dag_id
         self.bf_start_date = start_date
diff --git a/airflow/jobs/base_job_runner.py b/airflow/jobs/base_job_runner.py
index eeaff887df..5b0af0dbec 100644
--- a/airflow/jobs/base_job_runner.py
+++ b/airflow/jobs/base_job_runner.py
@@ -25,6 +25,7 @@ if TYPE_CHECKING:
     from sqlalchemy.orm import Session
 
     from airflow.jobs.job import Job
+    from airflow.serialization.pydantic.job import JobPydantic
 
 
 class BaseJobRunner:
@@ -32,14 +33,13 @@ class BaseJobRunner:
 
     job_type = "undefined"
 
-    def __init__(self, job):
+    def __init__(self, job: Job | JobPydantic) -> None:
         if job.job_type and job.job_type != self.job_type:
             raise Exception(
                 f"The job is already assigned a different job_type: 
{job.job_type}."
                 f"This is a bug and should be reported."
             )
-        self.job = job
-        self.job.job_type = self.job_type
+        job.job_type = self.job_type
 
     def _execute(self) -> int | None:
         """
diff --git a/airflow/jobs/dag_processor_job_runner.py 
b/airflow/jobs/dag_processor_job_runner.py
index c8e114cb0b..b2420726c6 100644
--- a/airflow/jobs/dag_processor_job_runner.py
+++ b/airflow/jobs/dag_processor_job_runner.py
@@ -47,6 +47,7 @@ class DagProcessorJobRunner(BaseJobRunner, LoggingMixin):
         **kwargs,
     ):
         super().__init__(job)
+        self.job = job
         self.processor = processor
         self.processor.heartbeat = lambda: perform_heartbeat(
             job=self.job,
diff --git a/airflow/jobs/local_task_job_runner.py 
b/airflow/jobs/local_task_job_runner.py
index da821f2288..2fb4eaaba3 100644
--- a/airflow/jobs/local_task_job_runner.py
+++ b/airflow/jobs/local_task_job_runner.py
@@ -88,6 +88,7 @@ class LocalTaskJobRunner(BaseJobRunner, LoggingMixin):
         external_executor_id: str | None = None,
     ):
         super().__init__(job)
+        self.job = job
         LoggingMixin.__init__(self, context=task_instance)
         self.task_instance = task_instance
         self.ignore_all_deps = ignore_all_deps
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 1c1c2eddcf..080b8bf372 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -157,6 +157,7 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         processor_poll_interval: float | None = None,
     ):
         super().__init__(job)
+        self.job = job
         self.subdir = subdir
         self.num_runs = num_runs
         # In specific tests, we want to stop the parse loop after the _files_ 
have been parsed a certain
diff --git a/airflow/jobs/triggerer_job_runner.py 
b/airflow/jobs/triggerer_job_runner.py
index d26b52f637..4651b6cf92 100644
--- a/airflow/jobs/triggerer_job_runner.py
+++ b/airflow/jobs/triggerer_job_runner.py
@@ -253,6 +253,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
         capacity=None,
     ):
         super().__init__(job)
+        self.job = job
         if capacity is None:
             self.capacity = conf.getint("triggerer", "default_capacity", 
fallback=1000)
         elif isinstance(capacity, int) and capacity > 0:

Reply via email to