This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new df4026a47a Remove executor_class from Job - fixing backfil for custom
executors (#32219)
df4026a47a is described below
commit df4026a47aa6141142e186ec853ba286c2be0e95
Author: Jarek Potiuk <[email protected]>
AuthorDate: Wed Jun 28 07:32:36 2023 +0200
Remove executor_class from Job - fixing backfil for custom executors
(#32219)
The executor_class in job was used in the past to pass information
about executor being used but it has been replaced by executor
passed via JobRunner when needed. The executor_class has only
been used in backfill job, but it has been used wrongly - because
it had no fully qualified path there.
The right way of using it is to check the executor passed via
JobRunner, and since executor_class is not used any more, we
can safely remove it.
This fixes a bug when custom exceutors could not be used
during backfill.
Co-authored-by: Andrew Halpern
<[email protected]>
---
airflow/jobs/backfill_job_runner.py | 5 +----
airflow/jobs/job.py | 3 ---
tests/jobs/test_base_job.py | 1 -
3 files changed, 1 insertion(+), 8 deletions(-)
diff --git a/airflow/jobs/backfill_job_runner.py
b/airflow/jobs/backfill_job_runner.py
index 6ae6ae27cb..5b13490be7 100644
--- a/airflow/jobs/backfill_job_runner.py
+++ b/airflow/jobs/backfill_job_runner.py
@@ -540,10 +540,7 @@ class BackfillJobRunner(BaseJobRunner[Job], LoggingMixin):
cfg_path = None
- executor_class, _ = ExecutorLoader.import_executor_cls(
- self.job.executor_class,
- )
- if executor_class.is_local:
+ if executor.is_local:
cfg_path = tmp_configuration_copy()
executor.queue_task_instance(
diff --git a/airflow/jobs/job.py b/airflow/jobs/job.py
index 3c2caa3805..99394cbc45 100644
--- a/airflow/jobs/job.py
+++ b/airflow/jobs/job.py
@@ -104,9 +104,6 @@ class Job(Base, LoggingMixin):
self.hostname = get_hostname()
if executor:
self.executor = executor
- self.executor_class = executor.__class__.__name__
- else:
- self.executor_class = conf.get("core", "EXECUTOR")
self.start_date = timezone.utcnow()
self.latest_heartbeat = timezone.utcnow()
if heartrate is not None:
diff --git a/tests/jobs/test_base_job.py b/tests/jobs/test_base_job.py
index c646979237..0e36bd4571 100644
--- a/tests/jobs/test_base_job.py
+++ b/tests/jobs/test_base_job.py
@@ -204,7 +204,6 @@ class TestJob:
test_job = Job(heartrate=10, dag_id="example_dag", state=State.RUNNING)
MockJobRunner(job=test_job)
- assert test_job.executor_class == "SequentialExecutor"
assert test_job.heartrate == 10
assert test_job.dag_id == "example_dag"
assert test_job.hostname == "test_hostname"