o-nikolas commented on code in PR #28161: URL: https://github.com/apache/airflow/pull/28161#discussion_r1044914857
########## airflow/utils/log/file_task_handler.py: ########## @@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No log = f"*** Failed to load local log file: {location}\n" log += f"*** {str(e)}\n" return log, {"end_of_log": True} - elif self._should_check_k8s(ti.queue): - try: - from airflow.kubernetes.kube_client import get_kube_client - - kube_client = get_kube_client() + else: - log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n" + log += f"*** Log file does not exist: {location}\n" Review Comment: ```suggestion log += f"*** Local log file does not exist, trying to fetch logs from executor environment ***\n\n" ``` This more closely matches what was there previously as well as the new context you added. ########## airflow/utils/log/file_task_handler.py: ########## @@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No log = f"*** Failed to load local log file: {location}\n" log += f"*** {str(e)}\n" return log, {"end_of_log": True} - elif self._should_check_k8s(ti.queue): - try: - from airflow.kubernetes.kube_client import get_kube_client - - kube_client = get_kube_client() + else: - log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n" + log += f"*** Log file does not exist: {location}\n" + executor = ExecutorLoader.get_default_executor() Review Comment: You may need to first check if that method exists on the executor class before calling it, pending the result of the discussion here: https://github.com/apache/airflow/issues/28276#issuecomment-1344899475 ########## tests/utils/test_log_handlers.py: ########## @@ -267,36 +264,3 @@ def test_log_retrieval_valid(self, create_task_instance): log_url_ti.hostname = "hostname" url = FileTaskHandler._get_log_retrieval_url(log_url_ti, "DYNAMIC_PATH") assert url == "http://hostname:8793/log/DYNAMIC_PATH" - - -@pytest.mark.parametrize( - "config, queue, expected", - [ - (dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), None, False), - (dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), "kubernetes", False), - (dict(AIRFLOW__CORE__EXECUTOR="KubernetesExecutor"), None, True), - (dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"), "any", False), - (dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"), "kubernetes", True), - ( - dict( - AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor", - AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere", - ), - "hithere", - True, - ), - (dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"), "any", False), - (dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"), "kubernetes", True), - ( - dict( - AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor", - AIRFLOW__LOCAL_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere", - ), - "hithere", - True, - ), - ], -) -def test__should_check_k8s(config, queue, expected): - with patch.dict("os.environ", **config): - assert FileTaskHandler._should_check_k8s(queue) == expected Review Comment: You've nicely refactored `FileTaskHandler._read` to be unittestable. You can mock `os.path.exists(location)` to return false and also mock the kubernetes executor, then ensure `get_task_log` was called once with the expected ti input. You should then swap the executor to one that doesn't have an implementation and ensure you get None back (you shouldn't need to mock in that case since it has no implementation) and that the `_get_task_log_from_worker` method is called once (will need to mock that one). ########## airflow/utils/log/file_task_handler.py: ########## @@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int, metadata: dict[str, Any] | No log = f"*** Failed to load local log file: {location}\n" log += f"*** {str(e)}\n" return log, {"end_of_log": True} - elif self._should_check_k8s(ti.queue): - try: - from airflow.kubernetes.kube_client import get_kube_client - - kube_client = get_kube_client() + else: - log += f"*** Trying to get logs (last 100 lines) from worker pod {ti.hostname} ***\n\n" + log += f"*** Log file does not exist: {location}\n" + executor = ExecutorLoader.get_default_executor() + task_log = executor.get_task_log(ti) - res = kube_client.read_namespaced_pod_log( - name=ti.hostname, - namespace=conf.get("kubernetes_executor", "namespace"), - container="base", - follow=False, - tail_lines=100, - _preload_content=False, - ) + if isinstance(task_log, tuple): + return task_log - for line in res: - log += line.decode() + if task_log is None: + task_log = self._get_task_log_from_worker(ti, log, log_relative_path=log_relative_path) Review Comment: Should add a log here, saying that we're now falling back to fetching from worker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org