Pad71 opened a new issue, #41675:
URL: https://github.com/apache/airflow/issues/41675

   ### Apache Airflow version
   
   2.10.0
   
   ### If "Other Airflow 2 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   An error is displayed (see below)  in the log when processing the branch 
operator. Even if the branch operator behaves correctly in terms of 
functionality, the error in the log is very misleading
   
   error text:
   default_host
   [2024-08-22, 14:17:04 CEST] {local_task_job_runner.py:123} ▶ Pre task 
execution logs
   [2024-08-22, 14:17:04 CEST] {baseoperator.py:405} WARNING - 
BranchPythonOperator.execute cannot be called outside TaskInstance!
   [2024-08-22, 14:17:05 CEST] {python.py:240} INFO - Done. Returned value was: 
['False_DAY_IN_INTERVAL']
   [2024-08-22, 14:17:05 CEST] {branch.py:38} INFO - Branch into 
['False_DAY_IN_INTERVAL']
   [2024-08-22, 14:17:05 CEST] {skipmixin.py:230} INFO - Following branch 
('False_DAY_IN_INTERVAL',)
   [2024-08-22, 14:17:05 CEST] {skipmixin.py:278} INFO - Skipping tasks 
[('True_DAY_IN_INTERVAL', -1)]
   [2024-08-22, 14:17:05 CEST] {taskinstance.py:340} ▼ Post task execution logs
   **[2024-08-22, 14:17:05 CEST] {taskinstance.py:352} INFO - Marking task as 
SUCCESS**. dag_id=TEST_BUILT_IN_FCTS, task_id=IF_DAY_IN_INTERVAL_1_TO_20, 
run_id=manual__2024-08-22T14:16:35+02:00, execution_date=20240822T121635, 
start_date=20240822T121704, end_date=20240822T121705
   [2024-08-22, 14:17:05 CEST] {local_task_job_runner.py:261} INFO - Task 
exited with return code 0
   **[2024-08-22, 14:17:05 CEST] {taskinstance.py:3916} ERROR** - Error 
scheduling downstream tasks. Skipping it as this is entirely optional 
optimisation. There might be various reasons for it, please take a look at the 
stack trace to figure out if the root cause can be diagnosed and fixed. See the 
issue https://github.com/apache/airflow/issues/39717 for details and an example 
problem. If you would like to get help in solving root cause, open discussion 
with all details with your managed service support or in Airflow repository.
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 
3912, in schedule_downstream_tasks
       return TaskInstance._schedule_downstream_tasks(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", 
line 94, in wrapper
       return func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 
3861, in _schedule_downstream_tasks
       partial_dag = task.dag.partial_subset(
                     ^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 
2645, in partial_subset
       dag.task_dict = {
                       ^
     File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 
2646, in <dictcomp>
       t.task_id: _deepcopy_task(t)
                  ^^^^^^^^^^^^^^^^^
     File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 
2643, in _deepcopy_task
       return copy.deepcopy(t, memo)
              ^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 153, in deepcopy
       y = copier(memo)
           ^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 
1388, in __deepcopy__
       setattr(result, k, copy.deepcopy(v, memo))
                          ^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 172, in deepcopy
       y = _reconstruct(x, memo, *rv)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 271, in _reconstruct
       state = deepcopy(state, memo)
               ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 146, in deepcopy
       y = copier(x, memo)
           ^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 231, in _deepcopy_dict
       y[deepcopy(key, memo)] = deepcopy(value, memo)
                                ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 172, in deepcopy
       y = _reconstruct(x, memo, *rv)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 271, in _reconstruct
       state = deepcopy(state, memo)
               ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 146, in deepcopy
       y = copier(x, memo)
           ^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 231, in _deepcopy_dict
       y[deepcopy(key, memo)] = deepcopy(value, memo)
                                ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 172, in deepcopy
       y = _reconstruct(x, memo, *rv)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 271, in _reconstruct
       state = deepcopy(state, memo)
               ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 146, in deepcopy
       y = copier(x, memo)
           ^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 231, in _deepcopy_dict
       y[deepcopy(key, memo)] = deepcopy(value, memo)
                                ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 172, in deepcopy
       y = _reconstruct(x, memo, *rv)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 271, in _reconstruct
       state = deepcopy(state, memo)
               ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 146, in deepcopy
       y = copier(x, memo)
           ^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 231, in _deepcopy_dict
       y[deepcopy(key, memo)] = deepcopy(value, memo)
                                ^^^^^^^^^^^^^^^^^^^^^
     File "/usr/lib64/python3.11/copy.py", line 161, in deepcopy
       rv = reductor(4)
            ^^^^^^^^^^^
   TypeError: cannot pickle '_cffi_backend.FFI' object
   [2024-08-22, 14:17:05 CEST] {local_task_job_runner.py:240} ▲▲▲ Log group end
   
   ### What you think should happen instead?
   
   There should be no error in the error log.
   
   ### How to reproduce
   
   run some dag with branch operator
   
   ### Operating System
   
   Kubernetes on Unix platform
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to