xinbinhuang commented on a change in pull request #15419:
URL: https://github.com/apache/airflow/pull/15419#discussion_r615357893



##########
File path: airflow/utils/log/file_task_handler.py
##########
@@ -111,76 +112,16 @@ def _read(self, ti, try_number, metadata=None):  # 
pylint: disable=unused-argume
         log_relative_path = self._render_filename(ti, try_number)
         location = os.path.join(self.local_base, log_relative_path)
 
-        log = ""
-
+        executor = conf.get('core', 'executor')
         if os.path.exists(location):
-            try:
-                with open(location) as file:
-                    log += f"*** Reading local file: {location}\n"
-                    log += "".join(file.readlines())
-            except Exception as e:  # pylint: disable=broad-except
-                log = f"*** Failed to load local log file: {location}\n"
-                log += f"*** {str(e)}\n"
-        elif conf.get('core', 'executor') == 'KubernetesExecutor':  # pylint: 
disable=too-many-nested-blocks
-            try:
-                from airflow.kubernetes.kube_client import get_kube_client
-
-                kube_client = get_kube_client()
-
-                if len(ti.hostname) >= 63:
-                    # Kubernetes takes the pod name and truncates it for the 
hostname. This truncated hostname
-                    # is returned for the fqdn to comply with the 63 character 
limit imposed by DNS standards
-                    # on any label of a FQDN.
-                    pod_list = 
kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace'))
-                    matches = [
-                        pod.metadata.name
-                        for pod in pod_list.items
-                        if pod.metadata.name.startswith(ti.hostname)
-                    ]
-                    if len(matches) == 1:
-                        if len(matches[0]) > len(ti.hostname):
-                            ti.hostname = matches[0]
-
-                log += '*** Trying to get logs (last 100 lines) from worker 
pod {} ***\n\n'.format(
-                    ti.hostname
-                )
-
-                res = kube_client.read_namespaced_pod_log(
-                    name=ti.hostname,
-                    namespace=conf.get('kubernetes', 'namespace'),
-                    container='base',
-                    follow=False,
-                    tail_lines=100,
-                    _preload_content=False,
-                )
-
-                for line in res:
-                    log += line.decode()
-
-            except Exception as f:  # pylint: disable=broad-except
-                log += f'*** Unable to fetch logs from worker pod 
{ti.hostname} ***\n{str(f)}\n\n'
+            log = read_local_log(location)
+        elif executor == executor_constants.KUBERNETES_EXECUTOR or (
+            executor == executor_constants.CELERY_KUBERNETES_EXECUTOR
+            and ti.queue == conf.get('celery_kubernetes_executor', 
'kubernetes_queue')

Review comment:
       I didn't plan to add this originally. But after separating the logic 
out, this becomes more clear to me that we may need this. I haven't tested this 
yet (will do). But will also like to know if anyone using the 
`CELERY_KUBERNETES_EXECUTOR` already see this problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to