npai96 opened a new issue, #31944:
URL: https://github.com/apache/airflow/issues/31944

   ### Apache Airflow version
   
   2.6.1
   
   ### What happened
   
   **Context** 
   - Running DAGs within a `unittest` framework to validate successful 
depth-first execution using `dag.test()`
   - Depth-first execution is implemented using the `TaskFlow` `.expand()` 
operator over a `task_group` 
   
   **What Happened**
   - When trying to assert on failed task states of intermediate `.expand()` 
"iterations", observing that the entire DAG fails without allowing downstream 
tasks for subsequent "iterations" to continue (see `run_dag` method in toy DAG 
example)
   - This works fine when running the using Backfill jobs using the deprecated 
`DebugExecutor` (see `run_dag_deug_executor` method in toy DAG example)
   
   - Potentially related to [this 
issue](https://github.com/apache/airflow/pull/31541) ?
   
   
   ### What you think should happen instead
   
   Failed task "iterations" should not prevent downstream tasks from running 
   
   ### How to reproduce
   
   **Toy DAG**
   
   ```python
   import logging
   from datetime import datetime
   from typing import Any
   
   from airflow.decorators import dag, task, task_group
   
   
   @task
   def get_data() -> list[Any]:
       # Purposely returns a value with inconsistent data type
       return [1, "two", 3]
   
   
   @task
   def add_1(i: int) -> int:
       try:
           return i + 1
       except (TypeError, Exception) as err:
           raise Exception(err)
   
   
   @task
   def log_transformation(transformed_i: int) -> None:
       logging.info(f"Transformed number {transformed_i}")
   
   
   @task_group(group_id="process_data")
   def process_data(i: Any) -> None:
       transformed_i = add_1(i)
       log_transformation(transformed_i)
   
   
   @dag(dag_id="depth_first_dag", start_date=datetime.now())
   def depth_first_dag():
       all_data = get_data()
       process_data.expand(i=all_data)
   
   
   depth_first_dag()
   ```
   
   
   
   **Failing Unit Tests**
   ```python
   """
   Usage: python3 -m unittest test_depth_first_dag.py
   """
   
   import unittest
   from datetime import datetime, timezone
   from typing import Any, Optional
   
   # imported `pandas` because `freezegun.freeze_time` depends on it
   # import pandas  # type: ignore
   from airflow.exceptions import BackfillUnfinished
   from airflow.executors.debug_executor import DebugExecutor
   from airflow.models import DagBag
   from airflow.models.dag import DAG
   from airflow.models.taskinstance import TaskInstance
   from freezegun import freeze_time
   
   
   #
   # Helper Functions
   #
   def run_dag(
       dag_name: str,
       execution_date: Optional[datetime] = None,
       conf: Optional[dict[str, Any]] = None,
   ) -> DAG:
       """
       Runs DAG in a "dagtest" using `.test()`
       Modeled on `dag_test` in 
https://github.com/apache/airflow/blob/2.5.3/airflow/cli/commands/dag_command.py#L440
       """
       dag = DagBag(include_examples=False, safe_mode=False).get_dag(dag_name)
       execution_date = datetime.now(timezone.utc) if not execution_date else 
execution_date
       dag.test(execution_date=execution_date, run_conf=conf)
       return dag
   
   
   def run_dag_debug_executor() -> DAG:
       """
       Runs a DAG in a test with a Backfill job using `DebugExecutor` -- 
SIGNIFICANTLY slower
       """
       execution_date = datetime.now(timezone.utc)
       dag_bag = DagBag(include_examples=False, safe_mode=False)
       dag = dag_bag.get_dag("depth_first_dag")
       dag.clear(
           task_ids=dag.task_ids,
           start_date=datetime(1970, 1, 1, tzinfo=timezone.utc),
           end_date=datetime(2038, 12, 31, tzinfo=timezone.utc),
           dag_bag=dag_bag,
       )
       dag.run(
           start_date=execution_date,
           end_date=datetime.now(timezone.utc),
           executor=DebugExecutor(),  # type: ignore
           run_at_least_once=True,
           conf=None,
           verbose=True,
           disable_retry=True,
       )
       return dag
   
   
   def get_task_group_states(dag: DAG) -> dict[str, list[str]]:
       downstream_task_ids = ["process_data.add_1", 
"process_data.log_transformation"]
       task_instance_states: dict[str, list[str]] = {}
       # Each task (even the skipped ones) should have 3 mapped instances (1 
for each input)
       for task_id in downstream_task_ids:
           task_instances = [
               TaskInstance(dag.task_dict[task_id], 
execution_date=datetime.now(), map_index=i) for i in range(0, 4)
           ]
           task_instance_states[task_id] = [task_instances[i].current_state() 
for i in range(0, 3)]
       return task_instance_states
   
   
   #
   # Unit Tests
   #
   class TestDepthFirstDAG(unittest.TestCase):
       @freeze_time("2023-06-02 10:00:00+00:00")
       def test_depth_first_dag_debug_executor(self):
           # Get DAG instance
           df_dag = DagBag(include_examples=False, 
safe_mode=False).get_dag("depth_first_dag")
   
           # Run DAG using `DebugExecutor` with Backfill jobs
           with self.assertRaises(BackfillUnfinished):
               run_dag_debug_executor()
   
           # Assert on task instance states
           task_instance_states = get_task_group_states(dag=df_dag)
           self.assertEqual(["success", "failed", "success"], 
task_instance_states["process_data.add_1"])
           self.assertEqual(
               ["success", "upstream_failed", "success"], 
task_instance_states["process_data.log_transformation"]
           )
   
       @freeze_time("2023-06-04 10:00:00+00:00")
       def test_depth_first_dag_dagtest(self):
           # Get DAG instance
           df_dag = DagBag(include_examples=False, 
safe_mode=False).get_dag("depth_first_dag")
   
           # Run DAG using `dag.test()` -- this fails the entire DAG
           df_dag = run_dag("depth_first_dag")
   
           # Assert on task instance states
           task_instance_states = get_task_group_states(dag=df_dag)
           self.assertEqual(["success", "failed", "success"], 
task_instance_states["process_data.add_1"])
           self.assertEqual(["success", "skipped", "success"], 
task_instance_states["process_data.log_transformation"])
   
   ```
   
   **Explanation**
   - Toy DAG involves 3 tasks: 
           1. `get_data`
           2. `add_1`
           3. `log_transformation`
       where 2 and 3 are wrapped in a `task_group` named `process_data`
   - `get_data` retrieves a list of 3 iterables: `1`, `"two"`, and `3` --> 
passes them to `process_data` 
   - What I am trying to accomplish is assert on the states of each task 
instance to validate that tasks for Iteration 3 continue to run even though 
Iteration 2 fails
   
   **Behaviour**
   | Iteration #      | Input Value      | Expected Behaviour | Observed 
Behaviour     |
   | :---        | :---    |    :---   | :---      |
   | 1 | `1`      | All tasks succeed       | All tasks succeed   |
   |2 | `"two"`   | process_data.add_1 should fail with a `TypeError` and skip 
the downstream task `log_transformation`        | **`process_data.add_1` 
failure causes entire DAG to fail with `Exception` rather than just the task 
instance for this iteration**     |
   |3 | `3`   | All tasks should succeed        | No tasks are run      |
   
   
   ### Operating System
   
   MacOS Ventura Version 13.4 (22F66)
   
   ### Versions of Apache Airflow Providers
   
   ```python
   apache-airflow-providers-common-sql==1.4.0
   apache-airflow-providers-ftp==3.3.1
   apache-airflow-providers-google==10.0.0
   apache-airflow-providers-http==4.3.0
   apache-airflow-providers-imap==3.1.1
   apache-airflow-providers-postgres==5.4.0
   apache-airflow-providers-sendgrid==3.1.0
   apache-airflow-providers-slack==7.2.0
   apache-airflow-providers-sqlite==3.3.2
   ```
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   Helm Chart: Community helm chart version airflow-stable/airflow 8.7.1
   Airflow: apache/airflow:2.6.1-python3.10
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to