dmateusp opened a new issue #12813:
URL: https://github.com/apache/airflow/issues/12813
**Apache Airflow version**: `1.10.13`
**Kubernetes version (if you are using kubernetes)** (use `kubectl
version`): `v1.15.11-eks`
```
Client Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.0",
GitCommit:"2bd9643cee5b3b3a5ecbd3af49d09018f0773c77", GitTreeState:"clean",
BuildDate:"2019-09-18T14:36:53Z", GoVersion:"go1.12.9", Compiler:"gc",
Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"15+",
GitVersion:"v1.15.11-eks-065dce",
GitCommit:"065dcecfcd2a91bd68a17ee0b5e895088430bd05", GitTreeState:"clean",
BuildDate:"2020-07-16T01:44:47Z", GoVersion:"go1.12.17", Compiler:"gc",
Platform:"linux/amd64"}
```
**Environment**:
- **Cloud provider or hardware configuration**: AWS (EKS)
- **OS** (e.g. from /etc/os-release):
```
PRETTY_NAME="Debian GNU/Linux 10 (buster)"
NAME="Debian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=debian
HOME_URL="https://www.debian.org/"
SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"
```
- **Kernel** (e.g. `uname -a`): `Linux ddac867b589a 4.19.76-linuxkit #1 SMP
Tue May 26 11:42:35 UTC 2020 x86_64 GNU/Linux`
- **Python**: `Python 3.7.9`
- **Others**: On Docker, built on top of apache/airflow image (FROM
apache/airflow)
**What happened**:
`xcom.pull(...)` throws UnicodeDecodeError when the task that produced the
xcom is KubernetesPodOperator:
```
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line
161, in get_one
return json.loads(result.value.decode('UTF-8'))
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0:
invalid start byte
```
(Full stack trace lower)
Note that this is happening across all of our DAGs when I try to migrate
from 1.10.10 to 1.10.13
**How to reproduce it**:
Here are 2 tasks that should reproduce it (that is part of a DAG I use to
test K8s features because making an update internally):
```python
bash_echo_1 = KubernetesPodOperator(
task_id="bash_echo_1",
image="bash:4.4",
name="bash-echo-1", # kubernetes pod names do not accept '_'
cmds=[
"bash",
"-c",
(
"mkdir -p /airflow/xcom "
'&& echo \'{"key1":"value1", "key2": "value2"}\' >
/airflow/xcom/return.json'
),
], # that's how xcom works for KubernetesPodOperator
# on_failure_callback=alert_opsgenie, # uncomment to test opgenie
do_xcom_push=True,
) # this needs to be set to true for `/airflow/xcom/return.json` to be
pushed as an xcom object
bash_echo_2 = KubernetesPodOperator(
task_id="bash_echo_2",
name="bash-echo-2", # kubernetes pod names do not accept '_'
image="bash:4.4",
arguments=[
"echo",
'key1 was: {{ ti.xcom_pull("bash_echo_1")["key1"] }}',
',key2 was: {{ ti.xcom_pull("bash_echo_1")["key2"] }}',
',the entire object was: {{ ti.xcom_pull("bash_echo_1") }}',
],
)
bash_echo_1 >> bash_echo_2
```
<details><summary>stack trace</summary>
```
Process DagFileProcessor324119-Process:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in
_bootstrap
self.run()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 159, in _run_file_processor
pickle_dags)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line
74, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1620, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 1299, in _process_dags
self._process_task_instances(dag, tis_out)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line
74, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
line 793, in _process_task_instances
ready_tis = run.update_state(session=session)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line
70, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py",
line 281, in update_state
ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks,
finished_tasks, session)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py",
line 340, in _get_ready_tis
session=session):
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line
70, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 659, in are_dependencies_met
session=session):
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 683, in get_failed_dep_statuses
dep_context):
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py",
line 106, in get_dep_statuses
for dep_status in self._get_dep_statuses(ti, session, dep_context):
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/ti_deps/deps/not_previously_skipped_dep.py",
line 58, in _get_dep_statuses
task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
line 1564, in xcom_pull
return pull_fn(task_id=task_ids)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line
74, in wrapper
return func(*args, **kwargs)
File
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line
161, in get_one
return json.loads(result.value.decode('UTF-8'))
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0:
invalid start byte
``` </details>
Note that bash_echo_2 simply does not get scheduled, the stack trace here
comes from the scheduler Pod (we run Airflow on KubernetesExecutor)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]