xinbinhuang commented on a change in pull request #15419:
URL: https://github.com/apache/airflow/pull/15419#discussion_r615357893
##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -111,76 +112,16 @@ def _read(self, ti, try_number, metadata=None): #
pylint: disable=unused-argume
log_relative_path = self._render_filename(ti, try_number)
location = os.path.join(self.local_base, log_relative_path)
- log = ""
-
+ executor = conf.get('core', 'executor')
if os.path.exists(location):
- try:
- with open(location) as file:
- log += f"*** Reading local file: {location}\n"
- log += "".join(file.readlines())
- except Exception as e: # pylint: disable=broad-except
- log = f"*** Failed to load local log file: {location}\n"
- log += f"*** {str(e)}\n"
- elif conf.get('core', 'executor') == 'KubernetesExecutor': # pylint:
disable=too-many-nested-blocks
- try:
- from airflow.kubernetes.kube_client import get_kube_client
-
- kube_client = get_kube_client()
-
- if len(ti.hostname) >= 63:
- # Kubernetes takes the pod name and truncates it for the
hostname. This truncated hostname
- # is returned for the fqdn to comply with the 63 character
limit imposed by DNS standards
- # on any label of a FQDN.
- pod_list =
kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace'))
- matches = [
- pod.metadata.name
- for pod in pod_list.items
- if pod.metadata.name.startswith(ti.hostname)
- ]
- if len(matches) == 1:
- if len(matches[0]) > len(ti.hostname):
- ti.hostname = matches[0]
-
- log += '*** Trying to get logs (last 100 lines) from worker
pod {} ***\n\n'.format(
- ti.hostname
- )
-
- res = kube_client.read_namespaced_pod_log(
- name=ti.hostname,
- namespace=conf.get('kubernetes', 'namespace'),
- container='base',
- follow=False,
- tail_lines=100,
- _preload_content=False,
- )
-
- for line in res:
- log += line.decode()
-
- except Exception as f: # pylint: disable=broad-except
- log += f'*** Unable to fetch logs from worker pod
{ti.hostname} ***\n{str(f)}\n\n'
+ log = read_local_log(location)
+ elif executor == executor_constants.KUBERNETES_EXECUTOR or (
+ executor == executor_constants.CELERY_KUBERNETES_EXECUTOR
+ and ti.queue == conf.get('celery_kubernetes_executor',
'kubernetes_queue')
Review comment:
I didn't plan to add this originally. But after separating the logic
out, this becomes more clear to me that we may need this. I haven't tested this
yet (will do). But will also like to know if anyone using the
`CELERY_KUBERNETES_EXECUTOR` already see this problem
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]