1fanwang commented on code in PR #68480:
URL: https://github.com/apache/airflow/pull/68480#discussion_r3437935121
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -311,92 +311,170 @@ def sync(self) -> None:
last_resource_version[ns] or
resource_instance.resource_version[ns]
)
- from kubernetes.client.rest import ApiException
-
if self.create_pods_after and self.create_pods_after > datetime.now():
self.log.warning("Skipping pod creation due to kubernetes rate
limit")
return
self.create_pods_after = None
+ if self.kube_config.async_pod_creation:
+ self._create_pods_concurrently()
+ else:
+ self._create_pods_sequentially()
+
+ def _create_pods_sequentially(self) -> None:
+ """Dequeue a batch and create worker pods one at a time (default
behavior)."""
+ from kubernetes.client.rest import ApiException
+
+ if TYPE_CHECKING:
+ assert self.kube_scheduler
+ created = 0
+ start = time.monotonic()
with contextlib.suppress(Empty):
for _ in range(self.kube_config.worker_pods_creation_batch_size):
task = self.task_queue.get_nowait()
-
+ created += 1
try:
- key = task.key
self.kube_scheduler.run_next(task)
Review Comment:
ah lemme clarify - it's moved not deleted
The old inline loop is now _create_pods_sequentially() and stays the
default: sync() calls it whenever async_pod_creation is off. The block that
looks removed is now _handle_pod_publish_error(), shared by both paths.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]