GitHub user lezaf created a discussion: Orchestration of multi-step workflow failures handlers
Hey everyone, š Iām working on a dynamic Airflow DAG where each item goes through a sequence of mapped tasks inside a task group (e.g., step_1 -> step_2 -> step_3). Each step may fail for specific items, and when that happens I want to route the original item to a corresponding failure handler to e.g. perform some compensation steps. Below is an example of a desired DAG outline: <img width="1077" height="389" alt="image" src="https://github.com/user-attachments/assets/b4936218-eb24-4fb4-908f-3c604ba82156" /> And a part of the underlying code for simplicity: ```py @task_group def processing_group(item): @task def step_1(item): if should_fail(item): raise ValueError("step 1 failed") return item @task(trigger_rule="all_failed") def step_1_fail_handler(step1_output): # <--- I want this task to receive the original input pass # rest of steps implentation step1_res = step_1(item) step1_failed = step_1_fail_handler(step1_res) # rest of steps wiring return { "step1_failed": step1_failed, "step2_failed": step2_failed, "step3_failed": step3_failed, "step3_success": step3_res, } ``` ## Problem When the mapped task step_1 fails, Airflow does not return any output, so the failure handler cannot access the original (or even better enriched as per requirements) item via the step result. I have concluded to a workaround using _current context_ and _xcom_push_, something like: ```py def processing_group(item): @task def step_1(item): if should_fail(item): # ============ Context manipulation (start) ============ context = get_current_context() context['ti'].xcom_push( key='return_value', value={ **item, "step_1_failed_flag": True, "error": "An error message", } ) # ============ Context manipulation (end) ============ raise ValueError("step 1 failed") return item step1_res = step_1(item) # rest return { "step1_res": step1_res "step2_res": step2_res "step3_res": step3_res, } ``` This way, I can aggregate on failure flags and achieve my goal. But it easily gets complicated and hard to maintain. ## My question: Is there an Airflow native way to handle this situation, or this idiomatic pattern is the solution here? Thanks in advance for any guidance! š GitHub link: https://github.com/apache/airflow/discussions/59221 ---- This is an automatically sent email for [email protected]. To unsubscribe, please send an email to: [email protected]
