npai96 opened a new issue, #31944: URL: https://github.com/apache/airflow/issues/31944
### Apache Airflow version 2.6.1 ### What happened **Context** - Running DAGs within a `unittest` framework to validate successful depth-first execution using `dag.test()` - Depth-first execution is implemented using the `TaskFlow` `.expand()` operator over a `task_group` **What Happened** - When trying to assert on failed task states of intermediate `.expand()` "iterations", observing that the entire DAG fails without allowing downstream tasks for subsequent "iterations" to continue (see `run_dag` method in toy DAG example) - This works fine when running the using Backfill jobs using the deprecated `DebugExecutor` (see `run_dag_deug_executor` method in toy DAG example) - Potentially related to [this issue](https://github.com/apache/airflow/pull/31541) ? ### What you think should happen instead Failed task "iterations" should not prevent downstream tasks from running ### How to reproduce **Toy DAG** ```python import logging from datetime import datetime from typing import Any from airflow.decorators import dag, task, task_group @task def get_data() -> list[Any]: # Purposely returns a value with inconsistent data type return [1, "two", 3] @task def add_1(i: int) -> int: try: return i + 1 except (TypeError, Exception) as err: raise Exception(err) @task def log_transformation(transformed_i: int) -> None: logging.info(f"Transformed number {transformed_i}") @task_group(group_id="process_data") def process_data(i: Any) -> None: transformed_i = add_1(i) log_transformation(transformed_i) @dag(dag_id="depth_first_dag", start_date=datetime.now()) def depth_first_dag(): all_data = get_data() process_data.expand(i=all_data) depth_first_dag() ``` **Failing Unit Tests** ```python """ Usage: python3 -m unittest test_depth_first_dag.py """ import unittest from datetime import datetime, timezone from typing import Any, Optional # imported `pandas` because `freezegun.freeze_time` depends on it # import pandas # type: ignore from airflow.exceptions import BackfillUnfinished from airflow.executors.debug_executor import DebugExecutor from airflow.models import DagBag from airflow.models.dag import DAG from airflow.models.taskinstance import TaskInstance from freezegun import freeze_time # # Helper Functions # def run_dag( dag_name: str, execution_date: Optional[datetime] = None, conf: Optional[dict[str, Any]] = None, ) -> DAG: """ Runs DAG in a "dagtest" using `.test()` Modeled on `dag_test` in https://github.com/apache/airflow/blob/2.5.3/airflow/cli/commands/dag_command.py#L440 """ dag = DagBag(include_examples=False, safe_mode=False).get_dag(dag_name) execution_date = datetime.now(timezone.utc) if not execution_date else execution_date dag.test(execution_date=execution_date, run_conf=conf) return dag def run_dag_debug_executor() -> DAG: """ Runs a DAG in a test with a Backfill job using `DebugExecutor` -- SIGNIFICANTLY slower """ execution_date = datetime.now(timezone.utc) dag_bag = DagBag(include_examples=False, safe_mode=False) dag = dag_bag.get_dag("depth_first_dag") dag.clear( task_ids=dag.task_ids, start_date=datetime(1970, 1, 1, tzinfo=timezone.utc), end_date=datetime(2038, 12, 31, tzinfo=timezone.utc), dag_bag=dag_bag, ) dag.run( start_date=execution_date, end_date=datetime.now(timezone.utc), executor=DebugExecutor(), # type: ignore run_at_least_once=True, conf=None, verbose=True, disable_retry=True, ) return dag def get_task_group_states(dag: DAG) -> dict[str, list[str]]: downstream_task_ids = ["process_data.add_1", "process_data.log_transformation"] task_instance_states: dict[str, list[str]] = {} # Each task (even the skipped ones) should have 3 mapped instances (1 for each input) for task_id in downstream_task_ids: task_instances = [ TaskInstance(dag.task_dict[task_id], execution_date=datetime.now(), map_index=i) for i in range(0, 4) ] task_instance_states[task_id] = [task_instances[i].current_state() for i in range(0, 3)] return task_instance_states # # Unit Tests # class TestDepthFirstDAG(unittest.TestCase): @freeze_time("2023-06-02 10:00:00+00:00") def test_depth_first_dag_debug_executor(self): # Get DAG instance df_dag = DagBag(include_examples=False, safe_mode=False).get_dag("depth_first_dag") # Run DAG using `DebugExecutor` with Backfill jobs with self.assertRaises(BackfillUnfinished): run_dag_debug_executor() # Assert on task instance states task_instance_states = get_task_group_states(dag=df_dag) self.assertEqual(["success", "failed", "success"], task_instance_states["process_data.add_1"]) self.assertEqual( ["success", "upstream_failed", "success"], task_instance_states["process_data.log_transformation"] ) @freeze_time("2023-06-04 10:00:00+00:00") def test_depth_first_dag_dagtest(self): # Get DAG instance df_dag = DagBag(include_examples=False, safe_mode=False).get_dag("depth_first_dag") # Run DAG using `dag.test()` -- this fails the entire DAG df_dag = run_dag("depth_first_dag") # Assert on task instance states task_instance_states = get_task_group_states(dag=df_dag) self.assertEqual(["success", "failed", "success"], task_instance_states["process_data.add_1"]) self.assertEqual(["success", "skipped", "success"], task_instance_states["process_data.log_transformation"]) ``` **Explanation** - Toy DAG involves 3 tasks: 1. `get_data` 2. `add_1` 3. `log_transformation` where 2 and 3 are wrapped in a `task_group` named `process_data` - `get_data` retrieves a list of 3 iterables: `1`, `"two"`, and `3` --> passes them to `process_data` - What I am trying to accomplish is assert on the states of each task instance to validate that tasks for Iteration 3 continue to run even though Iteration 2 fails **Behaviour** | Iteration # | Input Value | Expected Behaviour | Observed Behaviour | | :--- | :--- | :--- | :--- | | 1 | `1` | All tasks succeed | All tasks succeed | |2 | `"two"` | process_data.add_1 should fail with a `TypeError` and skip the downstream task `log_transformation` | **`process_data.add_1` failure causes entire DAG to fail with `Exception` rather than just the task instance for this iteration** | |3 | `3` | All tasks should succeed | No tasks are run | ### Operating System MacOS Ventura Version 13.4 (22F66) ### Versions of Apache Airflow Providers ```python apache-airflow-providers-common-sql==1.4.0 apache-airflow-providers-ftp==3.3.1 apache-airflow-providers-google==10.0.0 apache-airflow-providers-http==4.3.0 apache-airflow-providers-imap==3.1.1 apache-airflow-providers-postgres==5.4.0 apache-airflow-providers-sendgrid==3.1.0 apache-airflow-providers-slack==7.2.0 apache-airflow-providers-sqlite==3.3.2 ``` ### Deployment Other 3rd-party Helm chart ### Deployment details Helm Chart: Community helm chart version airflow-stable/airflow 8.7.1 Airflow: apache/airflow:2.6.1-python3.10 ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
