dstandish commented on code in PR #40017:
URL: https://github.com/apache/airflow/pull/40017#discussion_r1645060065
##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1844,6 +1895,58 @@ def _executor_to_tis(self, tis: list[TaskInstance]) ->
dict[BaseExecutor, list[T
executor = str(ti.executor)
else:
executor = None
- _executor_to_tis[ExecutorLoader.load_executor(executor)].append(ti)
+
+ if executor_obj := self._try_to_load_executor(executor):
+ _executor_to_tis[executor_obj].append(ti)
+ else:
+ continue
return _executor_to_tis
+
+ def _slots_free_for_tis(self, tis: list[TaskInstance]) ->
set[TaskInstance]:
+ """Return TIs that we have slots available for."""
+ # First get a mapping of executor names to slots they have available
+ executor_to_slots_available: dict[ExecutorName, int] = {}
+ for executor in self.job.executors:
+ # All executors should have a name if they are initted from the
executor_loader. But we need to
+ # check for None to make mypy happy.
+ if executor.name:
+ executor_to_slots_available[executor.name] =
executor.slots_available
+ else:
+ raise AirflowException(f"Executor {executor} did not have a
`name` field configured!")
+
+ # Loop through all the TIs we're scheduling for and add them to a set
to be moved to queued if the
+ # executor they're assigned to has slots available.
+ tis_we_have_room_for = set()
+ for ti in tis:
+ if ti.executor:
+ executor = str(ti.executor)
+ else:
+ executor = None
Review Comment:
is this another case for if TYPE_CHECKING?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]