GitHub user lezaf created a discussion: Orchestration of multi-step workflow 
failures handlers

Hey everyone, šŸ‘‹ 

I’m working on a dynamic Airflow DAG where each item goes through a sequence of 
mapped tasks inside a task group (e.g., step_1 -> step_2 -> step_3). Each step 
may fail for specific items, and when that happens I want to route the original 
item to a corresponding failure handler to e.g. perform some compensation 
steps. Below is an example of a desired DAG outline:

<img width="1077" height="389" alt="image" 
src="https://github.com/user-attachments/assets/b4936218-eb24-4fb4-908f-3c604ba82156";
 />


And a part of the underlying code for simplicity:

```py
@task_group
def processing_group(item):
    @task
    def step_1(item):
        if should_fail(item):
            raise ValueError("step 1 failed")
        return item

    @task(trigger_rule="all_failed")
    def step_1_fail_handler(step1_output):
        # <--- I want this task to receive the original input
        pass

    # rest of steps implentation

    step1_res = step_1(item)
    step1_failed = step_1_fail_handler(step1_res)

   # rest of steps wiring

   return {
       "step1_failed": step1_failed,
       "step2_failed": step2_failed,
       "step3_failed": step3_failed,
       "step3_success": step3_res,
    }
```

## Problem

When the mapped task step_1 fails, Airflow does not return any output, so the 
failure handler cannot access the original (or even better enriched as per 
requirements) item via the step result.

I have concluded to a workaround using _current context_ and _xcom_push_, 
something like:

```py
def processing_group(item):
    @task
    def step_1(item):
        if should_fail(item):
            # ============ Context manipulation (start) ============
            context = get_current_context()
            context['ti'].xcom_push(
                key='return_value',
                value={ 
                    **item,
                    "step_1_failed_flag": True,
                    "error": "An error message",
                } 
            )
            # ============ Context manipulation (end) ============
            raise ValueError("step 1 failed")
        return item

    step1_res = step_1(item)

    # rest

    return {
        "step1_res": step1_res
        "step2_res": step2_res
         "step3_res": step3_res,
    }
```

This way, I can aggregate on failure flags and achieve my goal. But it easily 
gets complicated and hard to maintain.

## My question:

Is there an Airflow native way to handle this situation, or this idiomatic 
pattern is the solution here?

Thanks in advance for any guidance! šŸ™ 

GitHub link: https://github.com/apache/airflow/discussions/59221

----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]

Reply via email to