This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new df4026a47a Remove executor_class from Job - fixing backfil for custom 
executors (#32219)
df4026a47a is described below

commit df4026a47aa6141142e186ec853ba286c2be0e95
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Jun 28 07:32:36 2023 +0200

    Remove executor_class from Job - fixing backfil for custom executors 
(#32219)
    
    The executor_class in job was used in the past to pass information
    about executor being used but it has been replaced by executor
    passed via JobRunner when needed. The executor_class has only
    been used in backfill job, but it has been used wrongly - because
    it had no fully qualified path there.
    
    The right way of using it is to check the executor passed via
    JobRunner, and since executor_class is not used any more, we
    can safely remove it.
    
    This fixes a bug when custom exceutors could not be used
    during backfill.
    
    Co-authored-by: Andrew Halpern 
<[email protected]>
---
 airflow/jobs/backfill_job_runner.py | 5 +----
 airflow/jobs/job.py                 | 3 ---
 tests/jobs/test_base_job.py         | 1 -
 3 files changed, 1 insertion(+), 8 deletions(-)

diff --git a/airflow/jobs/backfill_job_runner.py 
b/airflow/jobs/backfill_job_runner.py
index 6ae6ae27cb..5b13490be7 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -540,10 +540,7 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
 
                         cfg_path = None
 
-                        executor_class, _ = ExecutorLoader.import_executor_cls(
-                            self.job.executor_class,
-                        )
-                        if executor_class.is_local:
+                        if executor.is_local:
                             cfg_path = tmp_configuration_copy()
 
                         executor.queue_task_instance(
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 3c2caa3805..99394cbc45 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -104,9 +104,6 @@ class Job(Base, LoggingMixin):
         self.hostname = get_hostname()
         if executor:
             self.executor = executor
-            self.executor_class = executor.__class__.__name__
-        else:
-            self.executor_class = conf.get("core", "EXECUTOR")
         self.start_date = timezone.utcnow()
         self.latest_heartbeat = timezone.utcnow()
         if heartrate is not None:
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index c646979237..0e36bd4571 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -204,7 +204,6 @@ class TestJob:
 
         test_job = Job(heartrate=10, dag_id="example_dag", state=State.RUNNING)
         MockJobRunner(job=test_job)
-        assert test_job.executor_class == "SequentialExecutor"
         assert test_job.heartrate == 10
         assert test_job.dag_id == "example_dag"
         assert test_job.hostname == "test_hostname"

Reply via email to