inytar commented on issue #28973:
URL: https://github.com/apache/airflow/issues/28973#issuecomment-1397505498

   > @inytar , it works for me in the current main. No skipping. What are your 
other settings like?
   
   I've seen the issue on different settings/environments:
   - Production (first noticed the issue here):
      - k8s cluster
      - Postgresql DB
      - k8s executor
      - Airflow 2.5.0
   - Local Docker:
     - local executor
     - Postgresql DB
     - Airflow 2.5.0
   - Local standalone:
     - Main (commit: 23da4daaa0)
     - `airflow standalone`
     - Fresh db & config
   
   It seems to me like some kind of race condition, could you try upping the 
number of `CHAIN_TASKS`? If I set this to 1 I only see one task being skipped, 
if I set it to 2 I see both task being skipped.
   
   I've updated the code a bit to add a workaround. This just adds two empty 
tasks before the merge, when this is done the tasks aren't skipped anymore 
(also useful for anyone else that might hit this):
   
   ```python
   from datetime import datetime
   
   from airflow import DAG
   from airflow.decorators import task
   from airflow.utils.task_group import TaskGroup
   from airflow.operators.empty import EmptyOperator
   
   # Only one chained tasks results in only 1 of the `skipped_tasks` skipping.
   # Add in extra tasks results in both `skipped_tasks` skipping, but
   # no earlier tasks are ever skipped.
   CHAIN_TASKS = 2
   
   # Add workaround
   WORKAROUND = False
   
   
   @task()
   def add(x, y):
       return x, y
   
   
   with DAG(
       dag_id="test_skip",
       schedule=None,
       start_date=datetime(2023, 1, 13),
   ) as dag:
   
       init = EmptyOperator(task_id="init_task")
       final = EmptyOperator(task_id="final")
   
       for i in range(2):
           with TaskGroup(f"task_group_{i}") as tg:
               chain_task = [i]
               for j in range(CHAIN_TASKS):
                   chain_task = (
                       
add.override(task_id=f"add_{j}").partial(x=j).expand(y=chain_task)
                   )
               skipped_task = (
                   
add.override(task_id="skipped").partial(x=i).expand(y=chain_task)
               )
   
           init >> tg
   
           # Workaround: Adding an empty normal task before the merge step 
fixes the issue.
           if WORKAROUND:
               workaround = EmptyOperator(task_id=f"workaround_{i}")
               tg >> workaround
               next = workaround
           else:
               next = tg
   
           # Task isn't skipped if final (merging task) is removed.
           next >> final
   ```
   
   I'm OoO till Monday Jan 30th, so I won't be able to help much till then. 
When I'm back I'm happy to test, ect!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to