FabioCoder opened a new issue, #28535:
URL: https://github.com/apache/airflow/issues/28535
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
Airflow 2.4.2
No tasks are mapped and no downstream tasks are executed, although the list
to be mapped is not empty. This only happens sometimes and not regularly.
Extract from the log of the task that contains the list to be mapped:
```
AIRFLOW_CTX_DAG_RUN_ID=dataset_triggered__2022-12-21T02:56:51.044765+00:00
[2022-12-21, 05:31:11 CET] {python.py:177} INFO - Done. Returned value was:
['20e565f2-6492-4fe3-b4d7-f79f1646eaec', ...
'2d9dbd70-7ad7-4dab-a635-1248082b9a46']
[2022-12-21, 05:31:11 CET] {taskinstance.py:1401} INFO - Marking task as
SUCCESS. dag_id=qlik_reload_dag, task_id=qlik_app_tasks.get_tasks_by_tag,
execution_date=20221221T025651, start_date=20221221T043110,
end_date=20221221T043111
[2022-12-21, 05:31:11 CET] {local_task_job.py:164} INFO - Task exited with
return code 0
[2022-12-21, 05:31:11 CET] {local_task_job.py:273} INFO - 0 downstream tasks
scheduled from follow-on schedule check
```
For this task, there is no XCom message containing the returned list,
although the function has returned an list.
### What you think should happen instead
The array has a length of 85, so 85 tasks should have been mapped.
### How to reproduce
**custom/operators.py:**
```
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from custom.hooks import CustomHook
class CustomOperator(BaseOperator):
@apply_defaults
def __init__(self, conn_id, app_task_id, **kwargs):
super(QlikSenseStartTaskOperator, self).__init__(**kwargs)
self._conn_id = conn_id
self._app_task_id= app_task_id
def execute(self, context):
hook = CustomHook(conn_id=self._conn_id)
try:
...
finally:
hook.close()
```
**custom/utils.py :**
```
from airflow.decorators import task
from custom.hooks import CustomHook
import json
@task
def get_tasks_by_tag(conn_id, tag_id):
hook = CustomHook(conn_id=conn_id)
try:
tasks = json.loads(hook.get_tasks(tag_id))
finally:
hook.close()
task_ids = []
for task in tasks:
task_ids.append(task.get('id'))
return task_ids
```
**dag.py:**
```
from airflow import DAG
from custom.operators import CustomOperator
from custom.utils import get_tasks_by_tag
with DAG(
...
) as dag:
t1 = CustomOperator.\
partial(task_id='start_task', conn_id='xxxx').\
expand(app_task_id=get_tasks_by_tag(
'xxxx', tag_id))
t1
```
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Virtualenv installation
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]