dstandish commented on code in PR #40017:
URL: https://github.com/apache/airflow/pull/40017#discussion_r1649445406


##########
airflow/jobs/scheduler_job_runner.py:
##########
@@ -1838,12 +1890,61 @@ def _orphan_unreferenced_datasets(self, session: 
Session = NEW_SESSION) -> None:
     def _executor_to_tis(self, tis: list[TaskInstance]) -> dict[BaseExecutor, 
list[TaskInstance]]:
         """Organize TIs into lists per their respective executor."""
         _executor_to_tis: defaultdict[BaseExecutor, list[TaskInstance]] = 
defaultdict(list)
-        executor: str | None
         for ti in tis:
-            if ti.executor:
-                executor = str(ti.executor)
+            if executor_obj := self._try_to_load_executor(ti.executor):
+                _executor_to_tis[executor_obj].append(ti)
             else:
-                executor = None
-            _executor_to_tis[ExecutorLoader.load_executor(executor)].append(ti)
+                continue
 
         return _executor_to_tis
+
+    def _slots_free_for_tis(self, tis: list[TaskInstance]) -> 
set[TaskInstance]:
+        """Return TIs that we have slots available for."""
+        # First get a mapping of executor names to slots they have available
+        executor_to_slots_available: dict[ExecutorName, int] = {}
+        for executor in self.job.executors:
+            if TYPE_CHECKING:
+                # All executors should have a name if they are initted from 
the executor_loader. But we need
+                # to check for None to make mypy happy.
+                assert executor.name
+            executor_to_slots_available[executor.name] = 
executor.slots_available
+
+        # Loop through all the TIs we're scheduling for and add them to a set 
to be moved to queued if the
+        # executor they're assigned to has slots available.
+        tis_we_have_room_for = set()
+        for ti in tis:
+            if executor_obj := self._try_to_load_executor(ti.executor):
+                if TYPE_CHECKING:
+                    # All executors should have a name if they are initted 
from the executor_loader. But we
+                    # need to check for None to make mypy happy.
+                    assert executor_obj.name
+                if executor_to_slots_available[executor_obj.name] > 0:
+                    tis_we_have_room_for.add(ti)
+                    executor_to_slots_available[executor_obj.name] -= 1
+            else:
+                continue
+
+        return tis_we_have_room_for
+
+    def _try_to_load_executor(self, executor_name: str | None) -> BaseExecutor 
| None:
+        """Try to load the given executor.
+
+        In this context, we don't want to fail if the executor does not exist. 
Catch the exception and
+        log to the user.
+        """
+        try:
+            return ExecutorLoader.load_executor(executor_name)
+        except ValueError as e:
+            # This case should not happen unless some (as of now unknown) edge 
case occurs or direct DB
+            # modification, since the DAG parser will validate the tasks in 
the DAG and ensure the executor
+            # they request is available and if not, disallow the DAG to be 
scheduled.
+            # Keeping this exception handling because this is a critical issue 
if we do somehow find
+            # ourselves here and the user should get some feedback about that.

Review Comment:
   Nice



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to