dstandish commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660258928
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor
Review comment:
i think this could go behind `if TYPE_CHECKING`
##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.hostname = get_hostname()
- self.executor = executor or ExecutorLoader.get_default_executor()
+ if self.__class__.__name__ != "LocalTaskJob":
+ self.executor = executor or ExecutorLoader.get_default_executor()
Review comment:
another option would be to make `executor` a cached property and
`executor_class` a property. this would simply get them out of init, which i
think would be enough if they are not accessed in `LocalTaskJob` anyway.
```python
@cached_property
def executor(self):
return self._executor or ExecutorLoader.get_default_executor() # store
init param `executor` as private attr `_executor`
```
this has two benefits, one is you don't have to reference the subclass name
here (as in your first approach) and the other is you don't have to make as
many changes re executor in init params)
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -110,11 +111,14 @@ def __init__(
processor_poll_interval: float = conf.getfloat('scheduler',
'processor_poll_interval'),
do_pickle: bool = False,
log: logging.Logger = None,
+ executor: Optional[BaseExecutor] = None,
Review comment:
missing from docstring
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -39,7 +39,8 @@
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.dag_processing.processor import DagFileProcessorProcess
from airflow.exceptions import SerializedDagNotFound
-from airflow.executors.executor_loader import UNPICKLEABLE_EXECUTORS
+from airflow.executors.base_executor import BaseExecutor
Review comment:
though.... not sure it matters e.g. if already imported implicitly ....
but just in case i figured i'd point it out
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]