o-nikolas commented on code in PR #41904:
URL: https://github.com/apache/airflow/pull/41904#discussion_r1739332778


##########
airflow/providers/celery/executors/celery_kubernetes_executor.py:
##########
@@ -71,11 +75,21 @@ def kubernetes_queue(self) -> str:
 
     def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: 
KubernetesExecutor):
         super().__init__()
-        self._job_id: int | None = None
+        self._job_id: int | str | None = None
         self.celery_executor = celery_executor
         self.kubernetes_executor = kubernetes_executor
         self.kubernetes_executor.kubernetes_queue = self.kubernetes_queue
 
+    @property
+    def _task_event_logs(self):
+        self.celery_executor._task_event_logs += 
self.kubernetes_executor._task_event_logs

Review Comment:
   Do we need a copy here? Aren't you modifying the celery queue?



##########
airflow/providers/celery/executors/celery_kubernetes_executor.py:
##########
@@ -71,11 +75,21 @@ def kubernetes_queue(self) -> str:
 
     def __init__(self, celery_executor: CeleryExecutor, kubernetes_executor: 
KubernetesExecutor):
         super().__init__()
-        self._job_id: int | None = None
+        self._job_id: int | str | None = None
         self.celery_executor = celery_executor
         self.kubernetes_executor = kubernetes_executor
         self.kubernetes_executor.kubernetes_queue = self.kubernetes_queue
 
+    @property
+    def _task_event_logs(self):
+        self.celery_executor._task_event_logs += 
self.kubernetes_executor._task_event_logs
+        self.kubernetes_executor._task_event_logs.clear()

Review Comment:
   Will something downstream clear the celery queue like you did for the kube 
one above?



##########
airflow/providers/celery/executors/celery_kubernetes_executor.py:
##########
@@ -30,18 +31,21 @@
 
     raise AirflowOptionalProviderFeatureException(e)
 
-from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 
 if TYPE_CHECKING:
     from airflow.callbacks.base_callback_sink import BaseCallbackSink
     from airflow.callbacks.callback_requests import CallbackRequest
-    from airflow.executors.base_executor import CommandType, 
EventBufferValueType, QueuedTaskInstanceType
+    from airflow.executors.base_executor import (
+        CommandType,
+        EventBufferValueType,
+        QueuedTaskInstanceType,
+    )
     from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance
     from airflow.models.taskinstancekey import TaskInstanceKey
 
 
-class CeleryKubernetesExecutor(LoggingMixin):
+class CeleryKubernetesExecutor(BaseExecutor):

Review Comment:
   I think this might end up just making it harder to notice issues with these 
executors. It's a bit tricky to explain, but let's take this issue as an 
example:
   
   The real fix is the implementation you added below, which looks at the 
queues from each sub executor and joins them. If this class had been inheriting 
from base executor beforehand (without the real fix below) the user wouldn't 
have seen the exception they did, because this CKE class would have had the 
empty log queue from the base executor that would never get used. But the sub 
executors (both celery and kubernetes) would have also had their own log queues 
which actually would have been used. So the user wouldn't have been getting 
exceptions but the logs from the sub executors would have never propagated back 
up (again without the snippet below). So this doesn't really fix anything, 
mostly just obscures it. The user may have noticed logs missing and eventually 
found the reason, but that's just harder to detect than an exception that says 
"hey, this property is missing".
   
   Overall, these hybrid classes are awful and we all agree we want to get rid 
of them, I'm just not sure in the mean time if this specific inheritance helps 
more than it hurts. I hope that makes sense!



##########
airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py:
##########
@@ -20,18 +20,22 @@
 from typing import TYPE_CHECKING, Sequence

Review Comment:
   All comments above also apply here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to