optiluca opened a new issue, #63173:
URL: https://github.com/apache/airflow/issues/63173
### Apache Airflow version
3.1.7
### If "Other Airflow 3 version" selected, which one?
_No response_
### What happened?
I'm running a DAG where a task is implemented as a KubernetesPodOperator
(KPO), running in deferrable mode. As the task can take a couple of hours and
I am interested in seeing logs, I've set logging_interval=600.
In doing this, however, I've been hitting sporadic task failures due to
AirflowException:
```
File
/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
line 1068 in run
File
/usr/local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py,
line 1477 in _execute_task
File
/usr/local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py, line
1633 in resume_execution
File
/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
line 993 in trigger_reentry
File
/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
line 1019 in _clean
File
/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
line 1057 in post_complete_action
File
/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py,
line 1129 in cleanup
,AirflowException: Traceback (most recent call last):
File
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py",
line 183, in run
event = await self._wait_for_container_completion()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/triggers/pod.py",
line 307, in _wait_for_container_completion
self.last_log_time = await
self.pod_manager.fetch_container_logs_before_current_sec(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/utils/pod_manager.py",
line 1076, in fetch_container_logs_before_current_sec
logs = await self._hook.read_logs(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line
189, in async_wrapped
return await copy(fn, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line
111, in __call__
do = await self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line
153, in iter
result = await action(retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/tenacity/_utils.py", line
99, in inner
return call(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/tenacity/__init__.py", line
400, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 449, in
result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in
__get_result
raise self._exception
File
"/usr/local/lib/python3.12/site-packages/tenacity/asyncio/__init__.py", line
114, in __call__
result = await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
line 997, in read_logs
logs = await v1_api.read_namespaced_pod_log(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/hooks/kubernetes.py",
line 98, in call_api
return await super().call_api(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.12/site-packages/kubernetes_asyncio/client/api_client.py",
line 209, in __call_api
response_data.data = response_data.data.decode(encoding)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
UnicodeDecodeError: 'utf-8' codec can't decode bytes in position
16382-16383: unexpected end of data
```
Judging by the symptoms, it seems something in my logs occasionally produces
invalid utf-8 data, and this brings down the entire task.
I'm mitigating the issue by setting the following environment variables in
my kpo task:
```
# Force UTF-8 stdout so K8s pod trigger log fetch never hits
UnicodeDecodeError (provider decodes as UTF-8; truncation/invalid bytes cause
intermittent failures).
"PYTHONIOENCODING": "utf-8",
"LANG": "C.UTF-8",
"LC_ALL": "C.UTF-8",
# Use dumb terminal so progress bars (e.g. tqdm) use ASCII-only
output, avoiding multi-byte chars that can be truncated at chunk boundaries.
"TERM": "dumb",
```
### What you think should happen instead?
I think that handling of "invalid" utf-8 chars should be more graceful, and
not bring down the entire execution of the task.
### How to reproduce
- Prepare a DAG with a KPO step, configured in deferrable mode, with
logging_interval set
- The KPO step should contain e.g. tqdm progress bars
Run the DAG. In my case, accessing logs every 10 minutes for a task that
runs over around 2 hours results in >50% failure rate due to the error above.
### Operating System
SLES 15-SP7 (this is where the KPO k8s pods are being run)
### Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes, in whichever version is enforced
by astro/runtime:3.1-12
### Deployment
Astronomer
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]