atul-astronomer opened a new issue, #47907:
URL: https://github.com/apache/airflow/issues/47907

   ### Apache Airflow version
   
   main (development)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   Scheduler crash when deserialising dict while xcom pull
   
   File "/opt/airflow/airflow/models/dagrun.py", line 943, in update_state
       info = self.task_instance_scheduling_decisions(session)
     File "/opt/airflow/airflow/utils/session.py", line 98, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/models/dagrun.py", line 1123, in 
task_instance_scheduling_decisions
       schedulable_tis, changed_tis, expansion_happened = 
self._get_ready_tis(
     File "/opt/airflow/airflow/models/dagrun.py", line 1222, in 
_get_ready_tis
       if not schedulable.are_dependencies_met(session=session, 
dep_context=dep_context):
     File "/opt/airflow/airflow/utils/session.py", line 98, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 2301, in 
are_dependencies_met
       for dep_status in self.get_failed_dep_statuses(dep_context=dep_context, 
session=session):
     File "/opt/airflow/airflow/models/taskinstance.py", line 2325, in 
get_failed_dep_statuses
       for dep_status in dep.get_dep_statuses(self, session, dep_context):
     File "/opt/airflow/airflow/ti_deps/deps/base_ti_dep.py", line 116, in 
get_dep_statuses
       yield from self._get_dep_statuses(ti, session, cxt)
     File "/opt/airflow/airflow/ti_deps/deps/not_previously_skipped_dep.py", 
line 61, in _get_dep_statuses
       prev_result = ti.xcom_pull(
     File "/opt/airflow/airflow/utils/session.py", line 98, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 3360, in 
xcom_pull
       return _xcom_pull(
     File "/opt/airflow/airflow/utils/session.py", line 98, in wrapper
       return func(*args, **kwargs)
     File "/opt/airflow/airflow/models/taskinstance.py", line 553, in 
_xcom_pull
       return XComModel.deserialize_value(first)
     File "/opt/airflow/airflow/models/xcom.py", line 338, in 
deserialize_value
       return json.loads(result.value, cls=XComDecoder)
     File "/usr/local/lib/python3.9/json/__init__.py", line 339, in loads
       raise TypeError(f'the JSON object must be str, bytes or bytearray, '
   TypeError: the JSON object must be str, bytes or bytearray, not dict
   
   
   ### What you think should happen instead?
   
   _No response_
   
   ### How to reproduce
   
   Run the below DAG:
   
   ```python
   from airflow.models import DAG
   from airflow.providers.standard.operators.python import PythonOperator, 
BranchPythonOperator
   from airflow.providers.standard.operators.bash import BashOperator
   from airflow.providers.standard.operators.empty import EmptyOperator
   from pendulum import today
   
   from dags.plugins.airflow_dag_introspection import assert_the_task_states
   
   docs = """
   ####Purpose
   This dag tests that the BranchPythonOperator works correctly by testing that 
xcoms is only returned from the branch that successfully runs it's tasks.\n
   It also makes assertions of the tasks states to ensure the tasks that should 
be skipped are actually skipped.\n
   ####Expected Behavior
   This dag has 7 tasks 5 of which are expected to succeed and 2 of which are 
expected to be skipped.\n
   This dag should pass.
   
   """
   
   def branch_this_way():
       return "branch1"
   
   def branch1(val):
       return val
   
   def branch2(val):
       return val
   
   def xcoms_check(**context):
       ti = context['ti']
       val_to_check = ti.xcom_pull(task_ids="branch1", key="return_value")
       should_be_none = ti.xcom_pull(task_ids="branch2", key="return_value")
   
       assert val_to_check == {"this": "branch", "should": "return"}
       assert should_be_none == None
   
   with DAG(
       dag_id="branch_python_operator",
       start_date=today('UTC').add(days=-1),
       schedule=None,
       doc_md=docs,
       tags=['core']
   ) as dag:
   
       brancher = BranchPythonOperator(
           task_id="branch_python_operator",
           python_callable=branch_this_way,
       )
   
       branch1 = PythonOperator(
           task_id="branch1",
           python_callable=branch1,
           op_args=[{"this": "branch", "should": "return"}],
       )
   
       branch2 = PythonOperator(
           task_id="branch2",
           python_callable=branch2,
           op_args=[{"this": "branch", "shouldn't": "return"}]
       )
   
       d0 = EmptyOperator(task_id="dummy0")
   
       b0 = BashOperator(
           task_id="sleep_so_task_skips",
           bash_command="sleep 25"
       )
   
       check_xcoms = PythonOperator(
           task_id="check_xcoms",
           python_callable=xcoms_check,
       )
   
       check_states = PythonOperator(
           task_id="check_task_states",
           python_callable=assert_the_task_states,
           op_kwargs={"task_ids_and_assertions": {
               "branch_python_operator": "success",
               "branch1": "success",
               "sleep_so_task_skips": "success",
               "branch2": "skipped",
               "dummy0": "skipped"
           }
           }
       )
   
   
   brancher >> [branch1, branch2]
   branch1 >> b0 >> check_xcoms >> check_states
   branch2 >> d0
   
   
   ``` 
   
   ### Operating System
   
   Linux
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to