obrie commented on issue #11629:
URL: https://github.com/apache/airflow/issues/11629#issuecomment-718013495


   I don't know that it's the appropriate solution, but for what it's worth 
this is the patch I've applied to my Airflow cluster to get both pod and task 
logs visible in running tasks:
   
   ```python
   from airflow.configuration import conf
   from airflow.utils.log.file_task_handler import FileTaskHandler
   import os
   original__read = FileTaskHandler._read
   
   
   def _read(self, ti, try_number, metadata=None):
       log_relative_path = self._render_filename(ti, try_number)
       location = os.path.join(self.local_base, log_relative_path)
   
       if not os.path.exists(location) and conf.get('core', 'executor') == 
'KubernetesExecutor':
           log = '*** Trying to get logs from worker pod {} ***\n\n'\
                   .format(ti.hostname)
   
           try:
               from airflow.kubernetes.kube_client import get_kube_client
               from kubernetes.stream import stream as kubernetes_stream
   
               # Fix hostnames that exceeded max
               kube_client = get_kube_client()
               if len(ti.hostname) >= 63:
                   pods = 
kube_client.list_namespaced_pod(conf.get('kubernetes', 'namespace'))
                   matches = [
                       p.metadata.name for p in pods.items if 
p.metadata.name.startswith(ti.hostname)
                   ]
                   if len(matches) == 1 and len(matches[0]) > len(ti.hostname):
                       ti.hostname = matches[0]
   
               # Get Pod logs
               res = kube_client.read_namespaced_pod_log(
                   name=ti.hostname,
                   namespace=conf.get('kubernetes', 'namespace'),
                   container='base',
                   follow=False,
                   tail_lines=100,
                   _preload_content=False,
               )
               for line in res:
                   log += line.decode()
   
               # Get Task logs
               resp = kubernetes_stream(
                   kube_client.connect_get_namespaced_pod_exec,
                   ti.hostname,
                   conf.get('kubernetes', 'namespace'),
                   command=['cat', location],
                   stderr=True,
                   stdin=False,
                   stdout=True,
                   tty=False,
                   _preload_content=False,
               )
               try:
                   while resp.is_open():
                       resp.update(timeout=1)
                       if resp.peek_stdout():
                           log += resp.read_stdout()
                       if resp.peek_stderr():
                           log += resp.read_stderr()
               finally:
                   resp.close()
           except Exception as f:
               log += '*** Unable to fetch logs from worker pod {} 
***\n{}\n\n'.format(
                   ti.hostname, str(f)
               )
   
           return log, {'end_of_log': True}
       else:
           return original__read(self, ti, try_number, metadata)
   
   
   FileTaskHandler._read = _read
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to