inytar commented on issue #28973:
URL: https://github.com/apache/airflow/issues/28973#issuecomment-1397505498
> @inytar , it works for me in the current main. No skipping. What are your
other settings like?
I've seen the issue on different settings/environments:
- Production (first noticed the issue here):
- k8s cluster
- Postgresql DB
- k8s executor
- Airflow 2.5.0
- Local Docker:
- local executor
- Postgresql DB
- Airflow 2.5.0
- Local standalone:
- Main (commit: 23da4daaa0)
- `airflow standalone`
- Fresh db & config
It seems to me like some kind of race condition, could you try upping the
number of `CHAIN_TASKS`? If I set this to 1 I only see one task being skipped,
if I set it to 2 I see both task being skipped.
I've updated the code a bit to add a workaround. This just adds two empty
tasks before the merge, when this is done the tasks aren't skipped anymore
(also useful for anyone else that might hit this):
```python
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.utils.task_group import TaskGroup
from airflow.operators.empty import EmptyOperator
# Only one chained tasks results in only 1 of the `skipped_tasks` skipping.
# Add in extra tasks results in both `skipped_tasks` skipping, but
# no earlier tasks are ever skipped.
CHAIN_TASKS = 2
# Add workaround
WORKAROUND = False
@task()
def add(x, y):
return x, y
with DAG(
dag_id="test_skip",
schedule=None,
start_date=datetime(2023, 1, 13),
) as dag:
init = EmptyOperator(task_id="init_task")
final = EmptyOperator(task_id="final")
for i in range(2):
with TaskGroup(f"task_group_{i}") as tg:
chain_task = [i]
for j in range(CHAIN_TASKS):
chain_task = (
add.override(task_id=f"add_{j}").partial(x=j).expand(y=chain_task)
)
skipped_task = (
add.override(task_id="skipped").partial(x=i).expand(y=chain_task)
)
init >> tg
# Workaround: Adding an empty normal task before the merge step
fixes the issue.
if WORKAROUND:
workaround = EmptyOperator(task_id=f"workaround_{i}")
tg >> workaround
next = workaround
else:
next = tg
# Task isn't skipped if final (merging task) is removed.
next >> final
```
I'm OoO till Monday Jan 30th, so I won't be able to help much till then.
When I'm back I'm happy to test, ect!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]