dmateusp opened a new issue #12813:
URL: https://github.com/apache/airflow/issues/12813


   **Apache Airflow version**: `1.10.13`
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl 
version`):  `v1.15.11-eks`
   
   ```
   Client Version: version.Info{Major:"1", Minor:"16", GitVersion:"v1.16.0", 
GitCommit:"2bd9643cee5b3b3a5ecbd3af49d09018f0773c77", GitTreeState:"clean", 
BuildDate:"2019-09-18T14:36:53Z", GoVersion:"go1.12.9", Compiler:"gc", 
Platform:"darwin/amd64"}
   Server Version: version.Info{Major:"1", Minor:"15+", 
GitVersion:"v1.15.11-eks-065dce", 
GitCommit:"065dcecfcd2a91bd68a17ee0b5e895088430bd05", GitTreeState:"clean", 
BuildDate:"2020-07-16T01:44:47Z", GoVersion:"go1.12.17", Compiler:"gc", 
Platform:"linux/amd64"}
   ```
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: AWS (EKS)
   - **OS** (e.g. from /etc/os-release): 
   ```
   PRETTY_NAME="Debian GNU/Linux 10 (buster)"
   NAME="Debian GNU/Linux"
   VERSION_ID="10"
   VERSION="10 (buster)"
   VERSION_CODENAME=buster
   ID=debian
   HOME_URL="https://www.debian.org/";
   SUPPORT_URL="https://www.debian.org/support";
   BUG_REPORT_URL="https://bugs.debian.org/";
   ```
   - **Kernel** (e.g. `uname -a`): `Linux ddac867b589a 4.19.76-linuxkit #1 SMP 
Tue May 26 11:42:35 UTC 2020 x86_64 GNU/Linux`
   - **Python**:  `Python 3.7.9`
   - **Others**: On Docker, built on top of apache/airflow image (FROM 
apache/airflow)
   
   **What happened**:
   
   `xcom.pull(...)` throws UnicodeDecodeError when the task that produced the 
xcom is KubernetesPodOperator:
   
   ```
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 
161, in get_one
       return json.loads(result.value.decode('UTF-8'))
   UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: 
invalid start byte
   ```
   
   (Full stack trace lower)
   
   Note that this is happening across all of our DAGs when I try to migrate 
from 1.10.10 to 1.10.13
   
   **How to reproduce it**:
   
   Here are 2 tasks that should reproduce it (that is part of a DAG I use to 
test K8s features because making an update internally):
   
   ```python
       bash_echo_1 = KubernetesPodOperator(
           task_id="bash_echo_1",
           image="bash:4.4",
           name="bash-echo-1",  # kubernetes pod names do not accept '_'
           cmds=[
               "bash",
               "-c",
               (
                   "mkdir -p /airflow/xcom "
                   '&& echo \'{"key1":"value1", "key2": "value2"}\' > 
/airflow/xcom/return.json'
               ),
           ],  # that's how xcom works for KubernetesPodOperator
           # on_failure_callback=alert_opsgenie,  # uncomment to test opgenie
           do_xcom_push=True,
       )  # this needs to be set to true for `/airflow/xcom/return.json` to be 
pushed as an xcom object
   
       bash_echo_2 = KubernetesPodOperator(
           task_id="bash_echo_2",
           name="bash-echo-2",  # kubernetes pod names do not accept '_'
           image="bash:4.4",
           arguments=[
               "echo",
               'key1 was: {{ ti.xcom_pull("bash_echo_1")["key1"] }}',
               ',key2 was: {{ ti.xcom_pull("bash_echo_1")["key2"] }}',
               ',the entire object was: {{ ti.xcom_pull("bash_echo_1") }}',
           ],
       )
   
       bash_echo_1 >> bash_echo_2
   ```
   
   
   <details><summary>stack trace</summary> 
   ```
   Process DagFileProcessor324119-Process:
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in 
_bootstrap
       self.run()
     File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
       self._target(*self._args, **self._kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 159, in _run_file_processor
       pickle_dags)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 
74, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1620, in process_file
       self._process_dags(dagbag, dags, ti_keys_to_schedule)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1299, in _process_dags
       self._process_task_instances(dag, tis_out)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 
74, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 793, in _process_task_instances
       ready_tis = run.update_state(session=session)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 
70, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", 
line 281, in update_state
       ready_tis, changed_tis = self._get_ready_tis(scheduleable_tasks, 
finished_tasks, session)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/dagrun.py", 
line 340, in _get_ready_tis
       session=session):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 
70, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 659, in are_dependencies_met
       session=session):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 683, in get_failed_dep_statuses
       dep_context):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py",
 line 106, in get_dep_statuses
       for dep_status in self._get_dep_statuses(ti, session, dep_context):
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/ti_deps/deps/not_previously_skipped_dep.py",
 line 58, in _get_dep_statuses
       task_ids=parent.task_id, key=XCOM_SKIPMIXIN_KEY
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/taskinstance.py",
 line 1564, in xcom_pull
       return pull_fn(task_id=task_ids)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/utils/db.py", line 
74, in wrapper
       return func(*args, **kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/models/xcom.py", line 
161, in get_one
       return json.loads(result.value.decode('UTF-8'))
   UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: 
invalid start byte
   ```  </details>
   
   Note that bash_echo_2 simply does not get scheduled, the stack trace here 
comes from the scheduler Pod (we run Airflow on KubernetesExecutor)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to