vba commented on issue #45516: URL: https://github.com/apache/airflow/issues/45516#issuecomment-2587728230
Hi @potiuk, Thanks for your reply. > It simply optimizes for time of those who try to help you to solve your problem. No pb, as the subject is very complex, I didn't know how to present the key elements. > it seems that somewhere the k8s reads logs from remote pod and something does not let it read it. I've actually researched this problem quite a bit. Firstly, if I roll back to Airflow 2.4.3, the problem disappears. Another thing is that I've patched the Airflow code with `icecream`, trying to understand the problem step by step. In the log below, I call the API for the task instance that is running, you will notice that the remote log was fetched, and the problem starts afterwards : ``` 10.*.*.* - - [09/Jan/2025:10:23:02 +0000] "GET /api/v1/dags/code-python-airflow-sample/dagRuns/manual__2024-12-27T20:29:34.637180+00:00/taskInstances/python_task/logs/8?full_content=false HTTP/1.1" 500 1589 "https://airflow1-sandbox-dv.numberly.dev/dags/code-python-airflow-sample/grid?dag_run_id=manual__2024-12-27T20%3A29%3A34.637180%2B00%3A00&tab=logs&task_id=python_task" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:133.0) Gecko/20100101 Firefox/133.0" [2025-01-09 10:23:02 +0000] [70] [INFO] Parent changed, shutting down: <Worker 61> [2025-01-09 10:23:02 +0000] [70] [INFO] Worker exiting (pid: 61) 10.*.*.* - - [09/Jan/2025:10:23:03 +0000] "GET /health HTTP/1.1" 200 283 "-" "kube-probe/1.30" 10.*.*.* - - [09/Jan/2025:10:23:03 +0000] "GET /health HTTP/1.1" 200 283 "-" "kube-probe/1.30" LOG ISSUE DEBUG -> log_endpoint.py:60 in get_log() "Starting #get_log": 'Starting #get_log' LOG ISSUE DEBUG -> log_endpoint.py:62 in get_log() key: '****************' LOG ISSUE DEBUG -> log_endpoint.py:63 in get_log()- token: None LOG ISSUE DEBUG -> log_endpoint.py:71 in get_log()- metadata: {} LOG ISSUE DEBUG -> log_endpoint.py:80 in get_log() metadata: {'download_logs': False} LOG ISSUE DEBUG -> log_endpoint.py:82 in get_log() task_log_reader: <airflow.utils.log.log_reader.TaskLogReader object at 0x7f539c1ed510> LOG ISSUE DEBUG -> log_reader.py:119 in log_handler() task_log_reader: 'task' LOG ISSUE DEBUG -> log_reader.py:120 in log_handler() logging.getLogger("airflow.task").handlers: [<S3TaskHandler (NOTSET)>] LOG ISSUE DEBUG -> log_reader.py:121 in log_handler() logging.getLogger().handlers: [<RedirectStdHandler <stdout> (NOTSET)>] LOG ISSUE DEBUG -> log_endpoint.py:96 in get_log() ti: <TaskInstance: code-python-airflow-sample.python_task manual__2024-12-27T20:29:34.637180+00:00 [running]> LOG ISSUE DEBUG -> log_endpoint.py:103 in get_log() dag: <DAG: code-python-airflow-sample> LOG ISSUE DEBUG -> log_endpoint.py:107 in get_log() ti.task: <Task(PythonOperator): python_task> LOG ISSUE DEBUG -> log_endpoint.py:112 in get_log() return_type: 'text/plain' LOG ISSUE DEBUG -> log_endpoint.py:128 in get_log() logs: <generator object TaskLogReader.read_log_stream at 0x7f5396d165e0> LOG ISSUE DEBUG -> log_endpoint.py:130 in get_log()- 'Ending' LOG ISSUE DEBUG -> log_reader.py:78 in read_log_stream() "Starting #read_log_stream": 'Starting #read_log_stream' LOG ISSUE DEBUG -> log_reader.py:84 in read_log_stream() try_numbers: [8] LOG ISSUE DEBUG -> log_reader.py:85 in read_log_stream() ti: <TaskInstance: code-python-airflow-sample.python_task manual__2024-12-27T20:29:34.637180+00:00 [running]> LOG ISSUE DEBUG -> log_reader.py:86 in read_log_stream() metadata: {'download_logs': False} LOG ISSUE DEBUG -> log_reader.py:92 in read_log_stream() metadata: {'download_logs': False} LOG ISSUE DEBUG -> log_reader.py:65 in read_log_chunks() self.log_handler: <S3TaskHandler (NOTSET)> LOG ISSUE DEBUG -> s3_task_handler.py:122 in _read_remote_logs() ti: <TaskInstance: code-python-airflow-sample.python_task manual__2024-12-27T20:29:34.637180+00:00 [running]> LOG ISSUE DEBUG -> s3_task_handler.py:123 in _read_remote_logs() metadata: {'download_logs': False} LOG ISSUE DEBUG -> s3_task_handler.py:125 in _read_remote_logs() worker_log_rel_path: 'dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log' LOG ISSUE DEBUG -> s3_task_handler.py:130 in _read_remote_logs() bucket: 'my-bucket-logs' LOG ISSUE DEBUG -> s3_task_handler.py:131 in _read_remote_logs() prefix: 'airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log' LOG ISSUE DEBUG -> s3_task_handler.py:133 in _read_remote_logs() keys: ['airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log'] LOG ISSUE DEBUG -> s3_task_handler.py:142 in _read_remote_logs() messages: ['Found logs in s3:', ' * ' 's3://my-bucket-logs/airflow1-sandbox-dv/dag_id=code-python-airflow-sample/run_id=manual__2024-12-27T20:29:34.637180+00:00/task_id=python_task/attempt=8.log'] LOG ISSUE DEBUG -> s3_task_handler.py:143 in _read_remote_logs() logs: ['[2025-01-08T19:29:02.537+0000] {local_task_job_runner.py:123} INFO - ' '::group::Pre task execution logs ' '[2025-01-08T19:29:02.589+0000] {taskinstance.py:2613} INFO - Dependencies ' 'all met for dep_context=non-requeueable deps ti=<TaskInstance: ' 'code-python-airflow-sample.python_task ' 'manual__2024-12-27T20:29:34.637180+00:00 [queued]> ' '[2025-01-08T19:29:02.602+0000] {taskinstance.py:2613} INFO - Dependencies ' 'all met for dep_context=requeueable deps ti=<TaskInstance: ' 'code-python-airflow-sample.python_task ' 'manual__2024-12-27T20:29:34.637180+00:00 [queued]> ' '[2025-01-08T19:29:02.602+0000] {taskinstance.py:2866} INFO - Starting ' 'attempt 8 of 9 ' '[2025-01-08T19:29:02.627+0000] {taskinstance.py:2889} INFO - Executing ' '<Task(PythonOperator): python_task> on 2024-12-27 20:29:34.637180+00:00 ' '[2025-01-08T19:29:02.633+0000] {standard_task_runner.py:72} INFO - Started ' 'process 9 to run task ' '[2025-01-08T19:29:02.641+0000] {standard_task_runner.py:104} INFO - Running: ' "['airflow', 'tasks', 'run', 'code-python-airflow-sample', 'python_task', " "'manual__2024-12-27T20:29:34.637180+00:00', '--job-id', '133', '--raw', " "'--subdir', 'DAGS_FOLDER/code_python_airflow_sample/airflow_dag.py', " "'--cfg-path', '/tmp/tmp38klyd4c'] " '[2025-01-08T19:29:02.646+0000] {standard_task_runner.py:105} INFO - Job 133: ' 'Subtask python_task '[2025-01-08T19:59:03.705+0000] {taskinstance.py:352} INFO - Marking task as ' 'SUCCESS. dag_id=code-python-airflow-sample, task_id=python_task, ' 'run_id=manual__2024-12-27T20:29:34.637180+00:00, ' 'execution_date=20241227T202934, start_date=20250108T192902, ' 'end_date=20250108T195903 ' '[2025-01-08T19:59:03.763+0000] {local_task_job_runner.py:266} INFO - Task ' 'exited with return code 0 ' '[2025-01-08T19:59:03.857+0000] {taskinstance.py:3895} INFO - 0 downstream ' 'tasks scheduled from follow-on schedule check ' '[2025-01-08T19:59:03.860+0000] {local_task_job_runner.py:245} INFO - ' '::endgroup:: '] [2025-01-09T10:23:03.662+0000] {app.py:1744} ERROR - Exception on /api/v1/dags/code-python-airflow-sample/dagRuns/manual__2024-12-27T20:29:34.637180+00:00/taskInstances/python_task/logs/8 [GET] Traceback (most recent call last): File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 2529, in wsgi_app response = self.full_dispatch_request() File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1825, in full_dispatch_request rv = self.handle_user_exception(e) File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1823, in full_dispatch_request rv = self.dispatch_request() File "/my-loc/lib/python3.10/site-packages/flask/app.py", line 1799, in dispatch_request return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) File "/my-loc/lib/python3.10/site-packages/connexion/decorators/decorator.py", line 68, in wrapper response = function(request) File "/my-loc/lib/python3.10/site-packages/connexion/decorators/uri_parsing.py", line 149, in wrapper response = function(request) File "/my-loc/lib/python3.10/site-packages/connexion/decorators/validation.py", line 399, in wrapper return function(request) File "/my-loc/lib/python3.10/site-packages/connexion/decorators/response.py", line 113, in wrapper return _wrapper(request, response) File "/my-loc/lib/python3.10/site-packages/connexion/decorators/response.py", line 90, in _wrapper self.operation.api.get_connexion_response(response, self.mimetype) File "/my-loc/lib/python3.10/site-packages/connexion/apis/abstract.py", line 366, in get_connexion_response return cls._framework_to_connexion_response(response=response, mimetype=mimetype) File "/my-loc/lib/python3.10/site-packages/connexion/apis/flask_api.py", line 165, in _framework_to_connexion_response body=response.get_data() if not response.direct_passthrough else None, File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 314, in get_data self._ensure_sequence() File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 376, in _ensure_sequence self.make_sequence() File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 391, in make_sequence self.response = list(self.iter_encoded()) File "/my-loc/lib/python3.10/site-packages/werkzeug/wrappers/response.py", line 50, in _iter_encoded for item in iterable: File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/log_reader.py", line 94, in read_log_stream logs, metadata = self.read_log_chunks(ti, current_try_number, metadata) File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/log_reader.py", line 66, in read_log_chunks logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata) File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 491, in read log, out_metadata = self._read(task_instance, try_number_element, metadata) File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 389, in _read response = self._executor_get_task_log(ti, try_number) File "/my-loc/lib/python3.10/functools.py", line 981, in __get__ val = self.func(instance) File "/my-loc/lib/python3.10/site-packages/airflow/utils/log/file_task_handler.py", line 346, in _executor_get_task_log executor = ExecutorLoader.get_default_executor() File "/my-loc/lib/python3.10/site-packages/airflow/executors/executor_loader.py", line 165, in get_default_executor default_executor = cls.load_executor(cls.get_default_executor_name()) File "/my-loc/lib/python3.10/site-packages/airflow/executors/executor_loader.py", line 246, in load_executor executor = executor_cls() File "/my-loc/lib/python3.10/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 135, in __init__ self.task_queue: Queue[KubernetesJobType] = self._manager.Queue() File "/my-loc/lib/python3.10/multiprocessing/managers.py", line 723, in temp token, exp = self._create(typeid, *args, **kwds) File "/my-loc/lib/python3.10/multiprocessing/managers.py", line 606, in _create conn = self._Client(self._address, authkey=self._authkey) File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 508, in Client answer_challenge(c, authkey) File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 752, in answer_challenge message = connection.recv_bytes(256) # reject large message File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 216, in recv_bytes buf = self._recv_bytes(maxlength) File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 414, in _recv_bytes buf = self._recv(4) File "/my-loc/lib/python3.10/multiprocessing/connection.py", line 379, in _recv chunk = read(handle, remaining) BlockingIOError: [Errno 11] Resource temporarily unavailable ``` As you can see, all lines of code are slightly offset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
