kaxil commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660201602



##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       The other option is to keep it `None` in `BaseJob` and override it in 
`SchedulerJob` and `BackfillJob`

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       
https://github.com/apache/airflow/pull/16700/commits/a7f6efbabc94512fcf6b8ada80f13b7291a0d2c8
 vs 
https://github.com/apache/airflow/pull/16700/commits/376ffd1deff481717758281608b0764f38753956
   
   Thoughts?

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,7 +94,8 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
+        if self.__class__.__name__ != "LocalTaskJob":
+            self.executor = executor or ExecutorLoader.get_default_executor()

Review comment:
       >i do also wonder.... why is it that we only get this issue with CKE? is 
there perhaps something about the way in which CKE is designed that causes this 
problem?
   
   The issue is that it tries to create an instance of `KubernetesExecutor` 
inside `CeleryExecutor` and `KubernetesExecutor` creates a multiprocessing 
Manager  & Queue in its `__init__` which creates issues with Celery as 
explained in https://github.com/celery/celery/issues/4525

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, 
**kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow 
configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Well this will cause the same issue, executor_class is stored in DB.
   
   So when the Job model will be saved to DB it access self.executor_class 
which will execute self.execute and that will instantiate Executor, which we 
want to avoid




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to