ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660365084
##########
File path: airflow/jobs/backfill_job.py
##########
@@ -183,7 +186,8 @@ def __init__(
self.conf = conf
self.rerun_failed_tasks = rerun_failed_tasks
self.run_backwards = run_backwards
- super().__init__(*args, **kwargs)
+ self.executor = executor or ExecutorLoader.get_default_executor()
+ super().__init__(executor=self.executor, *args, **kwargs)
Review comment:
How about instead of doing this we make `executor` a lazy/cached
property on base job?
That way it is not loaded until it is first accessed, which might fix the
problem?
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
- self.executor_class = self.executor.__class__.__name__
+ self._executor = executor
+ if self._executor:
+ self.executor_class = self._executor.__class__.__name__
+ else:
+ self.executor_class = conf.get('core', 'EXECUTOR')
Review comment:
Can be much simpler
```suggestion
if executor:
self.executor
```
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args,
**kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow
configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
```suggestion
@cached_property
def executor(self):
return ExecutorLoader.get_default_executor()
@property
def executor_class(self):
return self.executor.__class__.__name__
```
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
- self.executor_class = self.executor.__class__.__name__
+ self._executor = executor
+ if self._executor:
+ self.executor_class = self._executor.__class__.__name__
+ else:
+ self.executor_class = conf.get('core', 'EXECUTOR')
Review comment:
Can be much simpler
```suggestion
if executor:
self.executor = executor
```
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args,
**kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow
configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
Ah, I didn't realise that. In which case change this to a
`@cached_property` too, and then set it in the `if executor:` block.
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args,
**kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow
configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
Oh right no. Okay.
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args,
**kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow
configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
+
Review comment:
Leave executor_class set as a normal attribute (as you had it) -- we
just don't need `_executor` but can directly set `self.executor`
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
- self.executor_class = self.executor.__class__.__name__
+ self._executor = executor
+ if self._executor:
+ self.executor_class = self._executor.__class__.__name__
+ else:
+ self.executor_class = conf.get('core', 'EXECUTOR')
Review comment:
Can be much simpler
```suggestion
if executor:
self.executor = executor
self.executor_class = executor.__class__.__name__
else:
self.executor_class = conf.get('core', 'EXECUTOR')
```
##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args,
**kwargs):
self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
super().__init__(*args, **kwargs)
+ @cached_property
+ def executor(self):
+ """Provided Executor which defaults to the one mentioned via Airflow
configuration"""
+ return self._executor or ExecutorLoader.get_default_executor()
Review comment:
```suggestion
@cached_property
def executor(self):
return ExecutorLoader.get_default_executor()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]