atul-astronomer opened a new issue, #47907:
URL: https://github.com/apache/airflow/issues/47907
### Apache Airflow version
main (development)
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
Scheduler crash when deserialising dict while xcom pull
File "/opt/airflow/airflow/models/dagrun.py", line 943, in update_state[K
info = self.task_instance_scheduling_decisions(session)[K
File "/opt/airflow/airflow/utils/session.py", line 98, in wrapper[K
return func(*args, **kwargs)[K
File "/opt/airflow/airflow/models/dagrun.py", line 1123, in
task_instance_scheduling_decisions[K
schedulable_tis, changed_tis, expansion_happened =
self._get_ready_tis([K
File "/opt/airflow/airflow/models/dagrun.py", line 1222, in
_get_ready_tis[K
if not schedulable.are_dependencies_met(session=session,
dep_context=dep_context):[K
File "/opt/airflow/airflow/utils/session.py", line 98, in wrapper[K
return func(*args, **kwargs)[K
File "/opt/airflow/airflow/models/taskinstance.py", line 2301, in
are_dependencies_met[K
for dep_status in self.get_failed_dep_statuses(dep_context=dep_context,
session=session):[K
File "/opt/airflow/airflow/models/taskinstance.py", line 2325, in
get_failed_dep_statuses[K
for dep_status in dep.get_dep_statuses(self, session, dep_context):[K
File "/opt/airflow/airflow/ti_deps/deps/base_ti_dep.py", line 116, in
get_dep_statuses[K
yield from self._get_dep_statuses(ti, session, cxt)[K
File "/opt/airflow/airflow/ti_deps/deps/not_previously_skipped_dep.py",
line 61, in _get_dep_statuses[K
prev_result = ti.xcom_pull([K
File "/opt/airflow/airflow/utils/session.py", line 98, in wrapper[K
return func(*args, **kwargs)[K
File "/opt/airflow/airflow/models/taskinstance.py", line 3360, in
xcom_pull[K
return _xcom_pull([K
File "/opt/airflow/airflow/utils/session.py", line 98, in wrapper[K
return func(*args, **kwargs)[K
File "/opt/airflow/airflow/models/taskinstance.py", line 553, in
_xcom_pull[K
return XComModel.deserialize_value(first)[K
File "/opt/airflow/airflow/models/xcom.py", line 338, in
deserialize_value[K
return json.loads(result.value, cls=XComDecoder)[K
File "/usr/local/lib/python3.9/json/__init__.py", line 339, in loads[K
raise TypeError(f'the JSON object must be str, bytes or bytearray, '[K
TypeError: the JSON object must be str, bytes or bytearray, not dict[K
### What you think should happen instead?
_No response_
### How to reproduce
Run the below DAG:
```python
from airflow.models import DAG
from airflow.providers.standard.operators.python import PythonOperator,
BranchPythonOperator
from airflow.providers.standard.operators.bash import BashOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from pendulum import today
from dags.plugins.airflow_dag_introspection import assert_the_task_states
docs = """
####Purpose
This dag tests that the BranchPythonOperator works correctly by testing that
xcoms is only returned from the branch that successfully runs it's tasks.\n
It also makes assertions of the tasks states to ensure the tasks that should
be skipped are actually skipped.\n
####Expected Behavior
This dag has 7 tasks 5 of which are expected to succeed and 2 of which are
expected to be skipped.\n
This dag should pass.
"""
def branch_this_way():
return "branch1"
def branch1(val):
return val
def branch2(val):
return val
def xcoms_check(**context):
ti = context['ti']
val_to_check = ti.xcom_pull(task_ids="branch1", key="return_value")
should_be_none = ti.xcom_pull(task_ids="branch2", key="return_value")
assert val_to_check == {"this": "branch", "should": "return"}
assert should_be_none == None
with DAG(
dag_id="branch_python_operator",
start_date=today('UTC').add(days=-1),
schedule=None,
doc_md=docs,
tags=['core']
) as dag:
brancher = BranchPythonOperator(
task_id="branch_python_operator",
python_callable=branch_this_way,
)
branch1 = PythonOperator(
task_id="branch1",
python_callable=branch1,
op_args=[{"this": "branch", "should": "return"}],
)
branch2 = PythonOperator(
task_id="branch2",
python_callable=branch2,
op_args=[{"this": "branch", "shouldn't": "return"}]
)
d0 = EmptyOperator(task_id="dummy0")
b0 = BashOperator(
task_id="sleep_so_task_skips",
bash_command="sleep 25"
)
check_xcoms = PythonOperator(
task_id="check_xcoms",
python_callable=xcoms_check,
)
check_states = PythonOperator(
task_id="check_task_states",
python_callable=assert_the_task_states,
op_kwargs={"task_ids_and_assertions": {
"branch_python_operator": "success",
"branch1": "success",
"sleep_so_task_skips": "success",
"branch2": "skipped",
"dummy0": "skipped"
}
}
)
brancher >> [branch1, branch2]
branch1 >> b0 >> check_xcoms >> check_states
branch2 >> d0
```
### Operating System
Linux
### Versions of Apache Airflow Providers
_No response_
### Deployment
Other
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]