Pad71 opened a new issue, #41675:
URL: https://github.com/apache/airflow/issues/41675
### Apache Airflow version
2.10.0
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
An error is displayed (see below) in the log when processing the branch
operator. Even if the branch operator behaves correctly in terms of
functionality, the error in the log is very misleading
error text:
default_host
[2024-08-22, 14:17:04 CEST] {local_task_job_runner.py:123} ▶ Pre task
execution logs
[2024-08-22, 14:17:04 CEST] {baseoperator.py:405} WARNING -
BranchPythonOperator.execute cannot be called outside TaskInstance!
[2024-08-22, 14:17:05 CEST] {python.py:240} INFO - Done. Returned value was:
['False_DAY_IN_INTERVAL']
[2024-08-22, 14:17:05 CEST] {branch.py:38} INFO - Branch into
['False_DAY_IN_INTERVAL']
[2024-08-22, 14:17:05 CEST] {skipmixin.py:230} INFO - Following branch
('False_DAY_IN_INTERVAL',)
[2024-08-22, 14:17:05 CEST] {skipmixin.py:278} INFO - Skipping tasks
[('True_DAY_IN_INTERVAL', -1)]
[2024-08-22, 14:17:05 CEST] {taskinstance.py:340} ▼ Post task execution logs
**[2024-08-22, 14:17:05 CEST] {taskinstance.py:352} INFO - Marking task as
SUCCESS**. dag_id=TEST_BUILT_IN_FCTS, task_id=IF_DAY_IN_INTERVAL_1_TO_20,
run_id=manual__2024-08-22T14:16:35+02:00, execution_date=20240822T121635,
start_date=20240822T121704, end_date=20240822T121705
[2024-08-22, 14:17:05 CEST] {local_task_job_runner.py:261} INFO - Task
exited with return code 0
**[2024-08-22, 14:17:05 CEST] {taskinstance.py:3916} ERROR** - Error
scheduling downstream tasks. Skipping it as this is entirely optional
optimisation. There might be various reasons for it, please take a look at the
stack trace to figure out if the root cause can be diagnosed and fixed. See the
issue https://github.com/apache/airflow/issues/39717 for details and an example
problem. If you would like to get help in solving root cause, open discussion
with all details with your managed service support or in Airflow repository.
Traceback (most recent call last):
File
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line
3912, in schedule_downstream_tasks
return TaskInstance._schedule_downstream_tasks(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py",
line 94, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line
3861, in _schedule_downstream_tasks
partial_dag = task.dag.partial_subset(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line
2645, in partial_subset
dag.task_dict = {
^
File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line
2646, in <dictcomp>
t.task_id: _deepcopy_task(t)
^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line
2643, in _deepcopy_task
return copy.deepcopy(t, memo)
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 153, in deepcopy
y = copier(memo)
^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line
1388, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 271, in _reconstruct
state = deepcopy(state, memo)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 146, in deepcopy
y = copier(x, memo)
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 231, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 271, in _reconstruct
state = deepcopy(state, memo)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 146, in deepcopy
y = copier(x, memo)
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 231, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 271, in _reconstruct
state = deepcopy(state, memo)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 146, in deepcopy
y = copier(x, memo)
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 231, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 271, in _reconstruct
state = deepcopy(state, memo)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 146, in deepcopy
y = copier(x, memo)
^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 231, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.11/copy.py", line 161, in deepcopy
rv = reductor(4)
^^^^^^^^^^^
TypeError: cannot pickle '_cffi_backend.FFI' object
[2024-08-22, 14:17:05 CEST] {local_task_job_runner.py:240} ▲▲▲ Log group end
### What you think should happen instead?
There should be no error in the error log.
### How to reproduce
run some dag with branch operator
### Operating System
Kubernetes on Unix platform
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]