Desdroid commented on issue #55759:
URL: https://github.com/apache/airflow/issues/55759#issuecomment-3302143873
Even more interestingly - slightly changing my example DAG to use batches of
numbers will result in the _add_10 task of the task group to be executed
successfully, however the downstream task of logging is not executed with an
`upstream failed` status, even though all upstreams were successful:
```python
import pendulum
from datetime import datetime
from airflow.sdk import task, task_group, dag
import logging
logger = logging.getLogger(__name__)
@dag(
dag_id="test_dag",
dag_display_name="test dynamic task group mapping",
schedule=None,
start_date=datetime(
2021, 6, 27, 0, 0, 0, tzinfo=pendulum.timezone("Europe/Berlin")
),
catchup=False,
description="1.0.0",
)
def test_dag():
@task
def get_nums():
return [[1, 2, 3], [4, 5, 6], [7, 8]]
@task
def times_2(nums):
return [num * 2 for num in nums]
@task
def add_10(nums):
return [num + 10 for num in nums]
@task
def log_num_batch(text, num_batch):
logger.info(f"{text}{num_batch}")
@task_group
def process_number_batch(num_batch):
_times_2 = times_2(num_batch)
_add_10 = add_10(_times_2)
return _add_10
_get_nums = get_nums()
# _times_2 = times_2.expand(num=_get_nums)
# _add_10 = add_10.expand(num=_times_2)
_processed_numbers = process_number_batch.expand(num_batch=_get_nums)
log_num_batch.partial(text="Got number batch result: ").expand(
num_batch=_processed_numbers
)
instance = test_dag()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]