o-nikolas commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1044914857


##########
airflow/utils/log/file_task_handler.py:
##########
@@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int, 
metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif self._should_check_k8s(ti.queue):
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
+        else:
 
-                log += f"*** Trying to get logs (last 100 lines) from worker 
pod {ti.hostname} ***\n\n"
+            log += f"*** Log file does not exist: {location}\n"

Review Comment:
   ```suggestion
               log += f"*** Local log file does not exist, trying to fetch logs 
from executor environment ***\n\n"
   ```
   
   This more closely matches what was there previously as well as the new 
context you added.



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int, 
metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif self._should_check_k8s(ti.queue):
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
+        else:
 
-                log += f"*** Trying to get logs (last 100 lines) from worker 
pod {ti.hostname} ***\n\n"
+            log += f"*** Log file does not exist: {location}\n"
+            executor = ExecutorLoader.get_default_executor()

Review Comment:
   You may need to first check if that method exists on the executor class 
before calling it, pending the result of the discussion here: 
https://github.com/apache/airflow/issues/28276#issuecomment-1344899475



##########
tests/utils/test_log_handlers.py:
##########
@@ -267,36 +264,3 @@ def test_log_retrieval_valid(self, create_task_instance):
         log_url_ti.hostname = "hostname"
         url = FileTaskHandler._get_log_retrieval_url(log_url_ti, 
"DYNAMIC_PATH")
         assert url == "http://hostname:8793/log/DYNAMIC_PATH";
-
-
-@pytest.mark.parametrize(
-    "config, queue, expected",
-    [
-        (dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), None, False),
-        (dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), "kubernetes", False),
-        (dict(AIRFLOW__CORE__EXECUTOR="KubernetesExecutor"), None, True),
-        (dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"), "any", 
False),
-        (dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"), 
"kubernetes", True),
-        (
-            dict(
-                AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor",
-                
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere",
-            ),
-            "hithere",
-            True,
-        ),
-        (dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"), "any", 
False),
-        (dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"), 
"kubernetes", True),
-        (
-            dict(
-                AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor",
-                AIRFLOW__LOCAL_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere",
-            ),
-            "hithere",
-            True,
-        ),
-    ],
-)
-def test__should_check_k8s(config, queue, expected):
-    with patch.dict("os.environ", **config):
-        assert FileTaskHandler._should_check_k8s(queue) == expected

Review Comment:
   You've nicely refactored `FileTaskHandler._read` to be unittestable. You can 
mock `os.path.exists(location)` to return false and also mock the kubernetes 
executor, then ensure `get_task_log` was called once with the expected ti 
input. You should then swap the executor to one that doesn't have an 
implementation and ensure you get None back (you shouldn't need to mock in that 
case since it has no implementation) and that the `_get_task_log_from_worker` 
method is called once (will need to mock that one).



##########
airflow/utils/log/file_task_handler.py:
##########
@@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int, 
metadata: dict[str, Any] | No
                 log = f"*** Failed to load local log file: {location}\n"
                 log += f"*** {str(e)}\n"
                 return log, {"end_of_log": True}
-        elif self._should_check_k8s(ti.queue):
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
+        else:
 
-                log += f"*** Trying to get logs (last 100 lines) from worker 
pod {ti.hostname} ***\n\n"
+            log += f"*** Log file does not exist: {location}\n"
+            executor = ExecutorLoader.get_default_executor()
+            task_log = executor.get_task_log(ti)
 
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
-                    namespace=conf.get("kubernetes_executor", "namespace"),
-                    container="base",
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
+            if isinstance(task_log, tuple):
+                return task_log
 
-                for line in res:
-                    log += line.decode()
+            if task_log is None:
+                task_log = self._get_task_log_from_worker(ti, log, 
log_relative_path=log_relative_path)

Review Comment:
   Should add a log here, saying that we're now falling back to fetching from 
worker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to