minhadona opened a new issue, #27859:
URL: https://github.com/apache/airflow/issues/27859
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
Airflow 2.3.3:
Use of .**_expand_** method to create tasks dynamically is returning this
message as import error:
```
Broken DAG: [/opt/airflow/dags/repo/dags/dag_validation_conciliacao/dag.py]
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskmixin.py",
line 233, in set_downstream
self._set_relatives(task_or_task_list, upstream=False,
edge_modifier=edge_modifier)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskmixin.py",
line 174, in _set_relatives
task_object.update_relative(self, not upstream)
```
**AttributeError: '_TaskDecorator' object has no attribute
'update_relative'**
I have 4 tasks:
`run_validation_for_pix >> run_validation_for_boleto >> get_results >>
check_and_build_slack_alert`
But for get_results, I need to be called more than 1 time. So before
declaring the workflow, i set:
```
list_tasks_get_results = [task for task in dag.task_ids if 'validation' in
task]
# i need all task_ids containing 'validation' on its name to be passed
as parameter
get_results = get_result.expand(task_idd = [list_tasks_get_results])
run_validation_for_pix >> run_validation_for_boleto >> get_results >>
check_and_build_slack_alert
```
### What you think should happen instead
It should create dynamically one task for every task_id that was on the list
passed by parameter to .expand method.
### How to reproduce
from airflow.decorators import task
from airflow import DAG
[...]
with DAG (
dag_id='conciliacao_hours_gap_validation',
schedule_interval='@daily',
start_date=datetime(2021, 1, 1),
catchup=False,
default_args={
"owner": "donus",
"start_date": "2021-08-15"
}
) as dag:
task1 = OperatorExternal(Databricks,
task_id = "validation_1" #IMPORTANT TO HAVE VALIDATION IN
ITS NAME
)
task2 = OtherExternalOperator(Databricks,
task_id = "validation_2"
)
list_tasks_get_results = [task for task in dag.task_ids if 'validation'
in task] # ONLY TASKS THAT CONTAINS "VALIDATION"
# list that has to iterate and be passed as argument to the task below
@task()
def get_result(task_idd, *, ti=None):
print(f'PRINT task: {task_idd}')
get_results = get_result.expand(task_idd = [list_tasks_get_results])
@task(task_id='send_slack_alert')
def check_and_build_slack_alert(ti=None):
print(f'pulling xcoms of databricks results')
xcom1 = ti.xcom_pull(f'get_result_validation_boleto',
key=f'databricks_output_validation_boleto}')
xcom2 = ti.xcom_pull(f'get_result_validation_pix',
key=f'databricks_output_validation_pix}')
print(f'|xcom1|: {xcom1}, |xcom2|: {xcom2}')
check_and_build_slack_alert = check_and_build_slack_alert()
#WORFLOW
run_validation_for_pix >> run_validation_for_boleto >> get_results >>
check_and_build_slack_alert
# so get_results should call get_result the same as much as the list of
task_ids containing "validation on its name", in this case, 2 tasks, so we
should have get_results [] , with get_results [validation_1] and get_results
[validation_2]
### Operating System
ubuntu
### Versions of Apache Airflow Providers
apache-airflow-providers-snowflake==2.5.0
apache-airflow-providers-databricks==2.2.0
apache-airflow-providers-mongo==2.3.1
apache-airflow-providers-mysql==2.2.1
apache-airflow-providers-microsoft-mssql==3.0.0
### Deployment
Other
### Deployment details
it's on Kubernetes, hosted on Azure
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]