xinbinhuang commented on a change in pull request #15419:
URL: https://github.com/apache/airflow/pull/15419#discussion_r615425791
##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -111,76 +112,16 @@ def _read(self, ti, try_number, metadata=None): #
pylint: disable=unused-argume
log_relative_path = self._render_filename(ti, try_number)
location = os.path.join(self.local_base, log_relative_path)
- log = ""
-
+ executor = conf.get('core', 'executor')
if os.path.exists(location):
- try:
- with open(location) as file:
- log += f"*** Reading local file: {location}\n"
- log += "".join(file.readlines())
- except Exception as e: # pylint: disable=broad-except
- log = f"*** Failed to load local log file: {location}\n"
- log += f"*** {str(e)}\n"
- elif conf.get('core', 'executor') == 'KubernetesExecutor': # pylint:
disable=too-many-nested-blocks
- try:
- from airflow.kubernetes.kube_client import get_kube_client
-
- kube_client = get_kube_client()
-
- if len(ti.hostname) >= 63:
- # Kubernetes takes the pod name and truncates it for the
hostname. This truncated hostname
- # is returned for the fqdn to comply with the 63 character
limit imposed by DNS standards
- # on any label of a FQDN.
- pod_list =
kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace'))
- matches = [
- pod.metadata.name
- for pod in pod_list.items
- if pod.metadata.name.startswith(ti.hostname)
- ]
- if len(matches) == 1:
- if len(matches[0]) > len(ti.hostname):
- ti.hostname = matches[0]
-
- log += '*** Trying to get logs (last 100 lines) from worker
pod {} ***\n\n'.format(
- ti.hostname
- )
-
- res = kube_client.read_namespaced_pod_log(
- name=ti.hostname,
- namespace=conf.get('kubernetes', 'namespace'),
- container='base',
- follow=False,
- tail_lines=100,
- _preload_content=False,
- )
-
- for line in res:
- log += line.decode()
-
- except Exception as f: # pylint: disable=broad-except
- log += f'*** Unable to fetch logs from worker pod
{ti.hostname} ***\n{str(f)}\n\n'
+ log = read_local_log(location)
+ elif executor == executor_constants.KUBERNETES_EXECUTOR or (
+ executor == executor_constants.CELERY_KUBERNETES_EXECUTOR
+ and ti.queue == conf.get('celery_kubernetes_executor',
'kubernetes_queue')
+ ):
+ log = read_kubernetes_pod_log(ti)
else:
- url =
os.path.join("http://{ti.hostname}:{worker_log_server_port}/log",
log_relative_path).format(
- ti=ti, worker_log_server_port=conf.get('celery',
'WORKER_LOG_SERVER_PORT')
- )
- log += f"*** Log file does not exist: {location}\n"
- log += f"*** Fetching from: {url}\n"
- try:
- timeout = None # No timeout
- try:
- timeout = conf.getint('webserver', 'log_fetch_timeout_sec')
- except (AirflowConfigException, ValueError):
- pass
-
- response = requests.get(url, timeout=timeout)
- response.encoding = "utf-8"
-
- # Check if the resource was properly fetched
- response.raise_for_status()
-
- log += '\n' + response.text
- except Exception as e: # pylint: disable=broad-except
- log += f"*** Failed to fetch log file from worker. {str(e)}\n"
+ log = read_celery_worker_log(ti, location, log_relative_path)
Review comment:
The original behavior seems to always read logs from the celery worker
node for the `CELERY_KUBERNETES_EXECUTOR`, so the extra check will allow the
webserver to grab log from k8s pod when the TI is running on the Kubernetes
queue.
I am not 100% sure about this as I never used the
`CELERY_KUBERNETES_EXECUTOR` before, and I will need to test it a bit.
Meanwhile, I wonder if anyone using the `CELERY_KUBERNETES_EXECUTOR` already
see this problem
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]