pierrejeambrun commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1083578430


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -204,91 +234,24 @@ def _read(self, ti: TaskInstance, try_number: int, 
metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif self._should_check_k8s(ti.queue):
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-                from airflow.kubernetes.pod_generator import PodGenerator
-
-                client = get_kube_client()
-
-                log += f"*** Trying to get logs (last 100 lines) from worker 
pod {ti.hostname} ***\n\n"
-                selector = PodGenerator.build_selector_for_k8s_executor_pod(
-                    dag_id=ti.dag_id,
-                    task_id=ti.task_id,
-                    try_number=ti.try_number,
-                    map_index=ti.map_index,
-                    run_id=ti.run_id,
-                    airflow_worker=ti.queued_by_job_id,
-                )
-                namespace = self._get_pod_namespace(ti)
-                pod_list = client.list_namespaced_pod(
-                    namespace=namespace,
-                    label_selector=selector,
-                ).items
-                if not pod_list:
-                    raise RuntimeError("Cannot find pod for ti %s", ti)
-                elif len(pod_list) > 1:
-                    raise RuntimeError("Found multiple pods for ti %s: %s", 
ti, pod_list)
-                res = client.read_namespaced_pod_log(
-                    name=pod_list[0].metadata.name,
-                    namespace=namespace,
-                    container="base",
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
+        else:
+            log += f"*** Local log file does not exist: {location}\n"
+            executor = ExecutorLoader.get_default_executor()
+            task_log = None
 
-                for line in res:
-                    log += line.decode()
+            if hasattr(executor, "get_task_log"):

Review Comment:
   Why do we need this check ? I think it only helps for custom executor that 
are not `BaseExecutor`, but other PR removed such check I believe.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to