o-nikolas commented on code in PR #41904:
URL: https://github.com/apache/airflow/pull/41904#discussion_r1739332778
##########
airflow/providers/celery/executors/celery_kubernetes_executor.py:
##########
@@ -71,11 +75,21 @@ def kubernetes_queue(self) -> str:
def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor:
KubernetesExecutor):
super().__init__()
- self._job_id: int | None = None
+ self._job_id: int | str | None = None
self.celery_executor = celery_executor
self.kubernetes_executor = kubernetes_executor
self.kubernetes_executor.kubernetes_queue = self.kubernetes_queue
+ @property
+ def _task_event_logs(self):
+ self.celery_executor._task_event_logs +=
self.kubernetes_executor._task_event_logs
Review Comment:
Do we need a copy here? Aren't you modifying the celery queue?
##########
airflow/providers/celery/executors/celery_kubernetes_executor.py:
##########
@@ -71,11 +75,21 @@ def kubernetes_queue(self) -> str:
def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor:
KubernetesExecutor):
super().__init__()
- self._job_id: int | None = None
+ self._job_id: int | str | None = None
self.celery_executor = celery_executor
self.kubernetes_executor = kubernetes_executor
self.kubernetes_executor.kubernetes_queue = self.kubernetes_queue
+ @property
+ def _task_event_logs(self):
+ self.celery_executor._task_event_logs +=
self.kubernetes_executor._task_event_logs
+ self.kubernetes_executor._task_event_logs.clear()
Review Comment:
Will something downstream clear the celery queue like you did for the kube
one above?
##########
airflow/providers/celery/executors/celery_kubernetes_executor.py:
##########
@@ -30,18 +31,21 @@
raise AirflowOptionalProviderFeatureException(e)
-from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
if TYPE_CHECKING:
from airflow.callbacks.base_callback_sink import BaseCallbackSink
from airflow.callbacks.callback_requests import CallbackRequest
- from airflow.executors.base_executor import CommandType,
EventBufferValueType, QueuedTaskInstanceType
+ from airflow.executors.base_executor import (
+ CommandType,
+ EventBufferValueType,
+ QueuedTaskInstanceType,
+ )
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
-class CeleryKubernetesExecutor(LoggingMixin):
+class CeleryKubernetesExecutor(BaseExecutor):
Review Comment:
I think this might end up just making it harder to notice issues with these
executors. It's a bit tricky to explain, but let's take this issue as an
example:
The real fix is the implementation you added below, which looks at the
queues from each sub executor and joins them. If this class had been inheriting
from base executor beforehand (without the real fix below) the user wouldn't
have seen the exception they did, because this CKE class would have had the
empty log queue from the base executor that would never get used. But the sub
executors (both celery and kubernetes) would have also had their own log queues
which actually would have been used. So the user wouldn't have been getting
exceptions but the logs from the sub executors would have never propagated back
up (again without the snippet below). So this doesn't really fix anything,
mostly just obscures it. The user may have noticed logs missing and eventually
found the reason, but that's just harder to detect than an exception that says
"hey, this property is missing".
Overall, these hybrid classes are awful and we all agree we want to get rid
of them, I'm just not sure in the mean time if this specific inheritance helps
more than it hurts. I hope that makes sense!
##########
airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py:
##########
@@ -20,18 +20,22 @@
from typing import TYPE_CHECKING, Sequence
Review Comment:
All comments above also apply here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]