GitHub user skiedude edited a discussion: What is the correct way to unit test 
the PythonBranchOperator

We are currently on airflow 2.4.3. In testing what its going to take to update 
to higher versions, starting with 2.5.0, we hit `RecursionError: maximum 
recursion depth exceeded in comparison` in some of our UnitTests that have 
worked up until now.

Using pytest and following test
```
    (
        'should_check_depreciation_info',
        [{'host1': {'is_physical_device': '0'}, 'host2': {'is_physical_device': 
'1'}}],
        {'get_depreciation_info', 'process_servers'}
    ),

]
...
def test_should_check_depreciation_info(xcom_patch, task_id, xcom_return_val, 
expected_downstream, dagbag, create_dag_task_context):
    """
    Test return_server should_check_depreciation_info logic gate
    Check that the expected_downstream is the correct downstream tasks returned
    """
    xcom_patch.return_value = xcom_return_val
    task, context = create_dag_task_context('return_server', task_id, dagbag)
    task.execute(context)
    xcom_return = task.xcom_pull(task_ids=task_id, key='skipmixin_key', 
context=context)
    assert set(xcom_return['followed']) == expected_downstream
    ```

We use the following helper function to create the task and context to run with 
`task.execute(context)`

    ```
    def create_dag_task_context():
    """
    Creates a manual dagrun of the given dag, and returns the task and context
    """
    def _create(dag_id, task_id, dagbag):
        dag = dagbag.get_dag(dag_id)
        execution_date = datetime.now()
        dagrun = dag.create_dagrun(
            execution_date=execution_date,
            state=DagRunState.RUNNING,
            run_type=DagRunType.MANUAL
        )
        task = dag.get_task(task_id)
        taskinst = dagrun.get_task_instance(task_id)
        taskinst.task = task
        context = taskinst.get_template_context()
        return task, context
    return _create
```

    The task `should_check_depreciation_info` is a branch task. It can either 
run the next step `get_depreciation_info` or skip it and go straight to 
`process_servers`
  
  The official error is 
  ```
  2025-01-10 12:39:53,638 INFO     
/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py:210:
 in execute
2025-01-10 12:39:53,638 INFO         branch = super().execute(context)
2025-01-10 12:39:53,638 INFO     
/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py:173:
 in execute
2025-01-10 12:39:53,638 INFO         self.op_kwargs = 
self.determine_kwargs(context)
2025-01-10 12:39:53,639 INFO     
/home/airflow/.local/lib/python3.9/site-packages/airflow/models/baseoperator.py:1008:
 in __setattr__
2025-01-10 12:39:53,639 INFO         self.set_xcomargs_dependencies()
2025-01-10 12:39:53,639 INFO     
/home/airflow/.local/lib/python3.9/site-packages/airflow/models/baseoperator.py:1115:
 in set_xcomargs_dependencies
2025-01-10 12:39:53,639 INFO         XComArg.apply_upstream_relationship(self, 
arg)
2025-01-10 12:39:53,639 INFO     
/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom_arg.py:121:
 in apply_upstream_relationship
2025-01-10 12:39:53,639 INFO         for operator, _ in 
XComArg.iter_xcom_references(arg):
2025-01-10 12:39:53,639 INFO     
/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom_arg.py:108:
 in iter_xcom_references
2025-01-10 12:39:53,639 INFO         yield from 
XComArg.iter_xcom_references(elem)
2025-01-10 12:39:53,639 INFO     
/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom_arg.py:111:
 in iter_xcom_references
2025-01-10 12:39:53,639 INFO         yield from 
XComArg.iter_xcom_references(getattr(arg, attr))
2025-01-10 12:39:53,639 INFO     
/home/airflow/.local/lib/python3.9/site-packages/airflow/models/xcom_arg.py:108:
 in iter_xcom_references
2025-01-10 12:39:53,640 INFO         yield from 
XComArg.iter_xcom_references(elem)
2025-01-10 12:39:53,640 INFO     E   RecursionError: maximum recursion depth 
exceeded in comparison
2025-01-10 12:39:53,640 INFO     !!! Recursion detected (same locals & position)
```

In looking in the release notes for 2.5.0, this seems to be the cause 
https://github.com/apache/airflow/pull/27771

In debugging through the code, its pushing the whole context into the 
`op_kwargs` key here 
https://github.com/apache/airflow/blob/2.5.0/airflow/utils/operator_helpers.py#L155-L165

Then when it goes through XcomArg recursion it just endlessly loops through 
itself. 

Any ideas on how to better setup this test?

GitHub link: https://github.com/apache/airflow/discussions/45565

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to