o-nikolas commented on code in PR #28161:
URL: https://github.com/apache/airflow/pull/28161#discussion_r1044914857
##########
airflow/utils/log/file_task_handler.py:
##########
@@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int,
metadata: dict[str, Any] | No
log = f"*** Failed to load local log file: {location}\n"
log += f"*** {str(e)}\n"
return log, {"end_of_log": True}
- elif self._should_check_k8s(ti.queue):
- try:
- from airflow.kubernetes.kube_client import get_kube_client
-
- kube_client = get_kube_client()
+ else:
- log += f"*** Trying to get logs (last 100 lines) from worker
pod {ti.hostname} ***\n\n"
+ log += f"*** Log file does not exist: {location}\n"
Review Comment:
```suggestion
log += f"*** Local log file does not exist, trying to fetch logs
from executor environment ***\n\n"
```
This more closely matches what was there previously as well as the new
context you added.
##########
airflow/utils/log/file_task_handler.py:
##########
@@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int,
metadata: dict[str, Any] | No
log = f"*** Failed to load local log file: {location}\n"
log += f"*** {str(e)}\n"
return log, {"end_of_log": True}
- elif self._should_check_k8s(ti.queue):
- try:
- from airflow.kubernetes.kube_client import get_kube_client
-
- kube_client = get_kube_client()
+ else:
- log += f"*** Trying to get logs (last 100 lines) from worker
pod {ti.hostname} ***\n\n"
+ log += f"*** Log file does not exist: {location}\n"
+ executor = ExecutorLoader.get_default_executor()
Review Comment:
You may need to first check if that method exists on the executor class
before calling it, pending the result of the discussion here:
https://github.com/apache/airflow/issues/28276#issuecomment-1344899475
##########
tests/utils/test_log_handlers.py:
##########
@@ -267,36 +264,3 @@ def test_log_retrieval_valid(self, create_task_instance):
log_url_ti.hostname = "hostname"
url = FileTaskHandler._get_log_retrieval_url(log_url_ti,
"DYNAMIC_PATH")
assert url == "http://hostname:8793/log/DYNAMIC_PATH"
-
-
[email protected](
- "config, queue, expected",
- [
- (dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), None, False),
- (dict(AIRFLOW__CORE__EXECUTOR="LocalExecutor"), "kubernetes", False),
- (dict(AIRFLOW__CORE__EXECUTOR="KubernetesExecutor"), None, True),
- (dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"), "any",
False),
- (dict(AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor"),
"kubernetes", True),
- (
- dict(
- AIRFLOW__CORE__EXECUTOR="CeleryKubernetesExecutor",
-
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere",
- ),
- "hithere",
- True,
- ),
- (dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"), "any",
False),
- (dict(AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor"),
"kubernetes", True),
- (
- dict(
- AIRFLOW__CORE__EXECUTOR="LocalKubernetesExecutor",
- AIRFLOW__LOCAL_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE="hithere",
- ),
- "hithere",
- True,
- ),
- ],
-)
-def test__should_check_k8s(config, queue, expected):
- with patch.dict("os.environ", **config):
- assert FileTaskHandler._should_check_k8s(queue) == expected
Review Comment:
You've nicely refactored `FileTaskHandler._read` to be unittestable. You can
mock `os.path.exists(location)` to return false and also mock the kubernetes
executor, then ensure `get_task_log` was called once with the expected ti
input. You should then swap the executor to one that doesn't have an
implementation and ensure you get None back (you shouldn't need to mock in that
case since it has no implementation) and that the `_get_task_log_from_worker`
method is called once (will need to mock that one).
##########
airflow/utils/log/file_task_handler.py:
##########
@@ -190,74 +220,22 @@ def _read(self, ti: TaskInstance, try_number: int,
metadata: dict[str, Any] | No
log = f"*** Failed to load local log file: {location}\n"
log += f"*** {str(e)}\n"
return log, {"end_of_log": True}
- elif self._should_check_k8s(ti.queue):
- try:
- from airflow.kubernetes.kube_client import get_kube_client
-
- kube_client = get_kube_client()
+ else:
- log += f"*** Trying to get logs (last 100 lines) from worker
pod {ti.hostname} ***\n\n"
+ log += f"*** Log file does not exist: {location}\n"
+ executor = ExecutorLoader.get_default_executor()
+ task_log = executor.get_task_log(ti)
- res = kube_client.read_namespaced_pod_log(
- name=ti.hostname,
- namespace=conf.get("kubernetes_executor", "namespace"),
- container="base",
- follow=False,
- tail_lines=100,
- _preload_content=False,
- )
+ if isinstance(task_log, tuple):
+ return task_log
- for line in res:
- log += line.decode()
+ if task_log is None:
+ task_log = self._get_task_log_from_worker(ti, log,
log_relative_path=log_relative_path)
Review Comment:
Should add a log here, saying that we're now falling back to fetching from
worker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]