ashb commented on a change in pull request #16700:
URL: https://github.com/apache/airflow/pull/16700#discussion_r660365084



##########
File path: airflow/jobs/backfill_job.py
##########
@@ -183,7 +186,8 @@ def __init__(
         self.conf = conf
         self.rerun_failed_tasks = rerun_failed_tasks
         self.run_backwards = run_backwards
-        super().__init__(*args, **kwargs)
+        self.executor = executor or ExecutorLoader.get_default_executor()
+        super().__init__(executor=self.executor, *args, **kwargs)

Review comment:
       How about instead of doing this we make `executor` a lazy/cached 
property on base job?
   
   That way it is not loaded until it is first accessed, which might fix the 
problem? 

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
-        self.executor_class = self.executor.__class__.__name__
+        self._executor = executor
+        if self._executor:
+            self.executor_class = self._executor.__class__.__name__
+        else:
+            self.executor_class = conf.get('core', 'EXECUTOR')

Review comment:
       Can be much simpler
   
   ```suggestion
           if executor:
               self.executor
   ```

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, 
**kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow 
configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       ```suggestion
       @cached_property
       def executor(self):
           return ExecutorLoader.get_default_executor()
   
       @property
       def executor_class(self):
           return self.executor.__class__.__name__
   ```

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
-        self.executor_class = self.executor.__class__.__name__
+        self._executor = executor
+        if self._executor:
+            self.executor_class = self._executor.__class__.__name__
+        else:
+            self.executor_class = conf.get('core', 'EXECUTOR')

Review comment:
       Can be much simpler
   
   ```suggestion
           if executor:
               self.executor = executor
   ```

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, 
**kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow 
configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Ah, I didn't realise that. In which case change this to a 
`@cached_property` too, and then set it in the `if executor:` block.

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, 
**kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow 
configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Oh right no. Okay.

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, 
**kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow 
configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()
+

Review comment:
       Leave executor_class set as a normal attribute (as you had it) -- we 
just don't need `_executor` but can directly set `self.executor`

##########
File path: airflow/jobs/base_job.py
##########
@@ -94,8 +95,11 @@ class BaseJob(Base, LoggingMixin):
 
     def __init__(self, executor=None, heartrate=None, *args, **kwargs):
         self.hostname = get_hostname()
-        self.executor = executor or ExecutorLoader.get_default_executor()
-        self.executor_class = self.executor.__class__.__name__
+        self._executor = executor
+        if self._executor:
+            self.executor_class = self._executor.__class__.__name__
+        else:
+            self.executor_class = conf.get('core', 'EXECUTOR')

Review comment:
       Can be much simpler
   
   ```suggestion
           if executor:
               self.executor = executor
               self.executor_class = executor.__class__.__name__
           else:
               self.executor_class = conf.get('core', 'EXECUTOR')
   ```

##########
File path: airflow/jobs/base_job.py
##########
@@ -104,6 +108,11 @@ def __init__(self, executor=None, heartrate=None, *args, 
**kwargs):
         self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
         super().__init__(*args, **kwargs)
 
+    @cached_property
+    def executor(self):
+        """Provided Executor which defaults to the one mentioned via Airflow 
configuration"""
+        return self._executor or ExecutorLoader.get_default_executor()

Review comment:
       ```suggestion
       @cached_property
       def executor(self):
           return  ExecutorLoader.get_default_executor()
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to