minhadona opened a new issue, #27859:
URL: https://github.com/apache/airflow/issues/27859

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Airflow 2.3.3:
   
   Use of .**_expand_** method to create tasks dynamically is returning this 
message as import error:
   
   ```
   Broken DAG: [/opt/airflow/dags/repo/dags/dag_validation_conciliacao/dag.py] 
Traceback (most recent call last):
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskmixin.py", 
line 233, in set_downstream
       self._set_relatives(task_or_task_list, upstream=False, 
edge_modifier=edge_modifier)
     File 
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskmixin.py", 
line 174, in _set_relatives
       task_object.update_relative(self, not upstream)
   ```
   **AttributeError: '_TaskDecorator' object has no attribute 
'update_relative'**
   
   I have 4 tasks: 
   `run_validation_for_pix >> run_validation_for_boleto >> get_results >> 
check_and_build_slack_alert`
   
   But for get_results, I need to be called more than 1 time. So before 
declaring the workflow, i set:
   
   ```
   list_tasks_get_results = [task for task in dag.task_ids if 'validation' in 
task]
       # i need all task_ids containing 'validation' on its name to be passed 
as parameter
   get_results = get_result.expand(task_idd = [list_tasks_get_results])
   run_validation_for_pix >> run_validation_for_boleto >> get_results >> 
check_and_build_slack_alert
   ```
   
   
   
   ### What you think should happen instead
   
   It should create dynamically one task for every task_id that was on the list 
passed by parameter to .expand method. 
   
   ### How to reproduce
   
   from airflow.decorators import task
   from airflow import DAG
   [...]
   
   with DAG (
       dag_id='conciliacao_hours_gap_validation',
       schedule_interval='@daily',
       start_date=datetime(2021, 1, 1),
       catchup=False,
       default_args={
           "owner": "donus",
           "start_date": "2021-08-15"
           }
   ) as dag:
   
       task1 = OperatorExternal(Databricks, 
                    task_id = "validation_1" #IMPORTANT TO HAVE VALIDATION IN 
ITS NAME
   )
       task2 = OtherExternalOperator(Databricks, 
                    task_id = "validation_2"
   )
   
       list_tasks_get_results = [task for task in dag.task_ids if 'validation' 
in task] # ONLY TASKS THAT CONTAINS "VALIDATION"
   # list that has to iterate and be passed as argument to the task below
   
       @task()
       def get_result(task_idd, *, ti=None):
            print(f'PRINT task: {task_idd}')
       get_results = get_result.expand(task_idd = [list_tasks_get_results])
   
       @task(task_id='send_slack_alert')
       def check_and_build_slack_alert(ti=None):
           print(f'pulling xcoms of databricks results') 
           
           xcom1 = ti.xcom_pull(f'get_result_validation_boleto', 
key=f'databricks_output_validation_boleto}')
           xcom2 = ti.xcom_pull(f'get_result_validation_pix', 
key=f'databricks_output_validation_pix}')
           print(f'|xcom1|: {xcom1}, |xcom2|: {xcom2}')
   
       check_and_build_slack_alert = check_and_build_slack_alert()
   
   
       #WORFLOW      
   run_validation_for_pix >> run_validation_for_boleto >> get_results >> 
check_and_build_slack_alert
   
   # so get_results should call get_result the same as much as the list of 
task_ids containing "validation on its name", in this case, 2 tasks, so we 
should have get_results [] , with get_results [validation_1] and get_results 
[validation_2] 
     
   
   
   
   
   
   ### Operating System
   
   ubuntu
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-snowflake==2.5.0
   apache-airflow-providers-databricks==2.2.0
   apache-airflow-providers-mongo==2.3.1
   apache-airflow-providers-mysql==2.2.1
   apache-airflow-providers-microsoft-mssql==3.0.0
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   it's on Kubernetes, hosted on Azure
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to