ferruzzi commented on code in PR #61153:
URL: https://github.com/apache/airflow/pull/61153#discussion_r2830604821


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3201,22 +3313,28 @@ def _try_to_load_executor(self, ti: TaskInstance, 
session, team_name=NOTSET) ->
                     # No executor found for that team, fall back to global 
default
                     executor = self.executor
         else:
-            # An executor is specified on the TaskInstance (as a str), so we 
need to find it in the list of executors
+            # An executor is specified on the workload (as a str), so we need 
to find it in the list of executors
             for _executor in self.executors:
-                if _executor.name and ti.executor in (_executor.name.alias, 
_executor.name.module_path):
+                if _executor.name and workload.get_executor_name() in (
+                    _executor.name.alias,
+                    _executor.name.module_path,
+                ):
                     # The executor must either match the team or be global 
(i.e. team_name is None)
                     if team_name and _executor.team_name == team_name or 
_executor.team_name is None:
                         executor = _executor
 
         if executor is not None:
-            self.log.debug("Found executor %s for task %s (team: %s)", 
executor.name, ti, team_name)
+            self.log.debug("Found executor %s for workload %s (team: %s)", 
executor.name, workload, team_name)
         else:
             # This case should not happen unless some (as of now unknown) edge 
case occurs or direct DB
             # modification, since the DAG parser will validate the tasks in 
the DAG and ensure the executor
             # they request is available and if not, disallow the DAG to be 
scheduled.
             # Keeping this exception handling because this is a critical issue 
if we do somehow find
             # ourselves here and the user should get some feedback about that.
-            self.log.warning("Executor, %s, was not found but a Task was 
configured to use it", ti.executor)
+            self.log.warning(
+                "Executor, %s, was not found but a workload was configured to 
use it",

Review Comment:
   I'm just concerned about the fact that I made this pretty extensible so 
other workloads can be added (triggerer events in the executor maybe?  who 
knows... now that the framework is here, I'm sure folks will come up with 
ideas) and we'll eventually need to start using the generic term, but maybe 
today is not that day.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to