jedcunningham commented on a change in pull request #15263:
URL: https://github.com/apache/airflow/pull/15263#discussion_r616130169
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -589,6 +598,44 @@ def sync(self) -> None:
break
# pylint: enable=too-many-nested-blocks
+ # Run any pending timed events
+ next_event = self.event_scheduler.run(blocking=False)
+ self.log.debug("Next timed event is in %f", next_event)
+
+ def _check_worker_pods_pending_timeout(self):
+ """Check if any pending worker pods have timed out"""
+ timeout = self.kube_config.worker_pods_pending_timeout
+ self.log.debug('Looking for pending worker pods older than %d
seconds', timeout)
+
+ kwargs = {
+ 'limit': self.kube_config.worker_pods_pending_timeout_batch_size,
+ 'field_selector': 'status.phase=Pending',
+ 'label_selector': f'airflow-worker={self.scheduler_job_id}',
Review comment:
@kaxil, after sitting down to switch to `kubernetes_executor=True` a
third time now, I'm leaning towards leaving it based on `scheduler_job_id` for
now. I think it is closer to "right", but what we really need is a unique id
for the airflow instance itself (most of KubernetsExecutor would actually
benefit from having one, in fact).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]