fedemgp opened a new issue, #42164: URL: https://github.com/apache/airflow/issues/42164
### Apache Airflow version main (development) ### If "Other Airflow 2 version" selected, which one? _No response_ ### What happened? The `__exit__` method of the `DatabricksWorkflowTaskGroup` operator is capable of raise an exception (it literally does it If you instance an operator class that cannot be serialized into a json [here](https://github.com/apache/airflow/blob/73f7d891583b023239c73a926cf1fdc69069176b/airflow/providers/databricks/operators/databricks_workflow.py#L306-L308)) If something inside the method raises, there will not be called the `__exit__` [method of the superclass](https://github.com/apache/airflow/blob/73f7d891583b023239c73a926cf1fdc69069176b/airflow/providers/databricks/operators/databricks_workflow.py#L317), that is in charge of [poping](https://github.com/apache/airflow/blob/73f7d891583b023239c73a926cf1fdc69069176b/airflow/utils/task_group.py#L360-L361) a [class' attribute with the group executed](https://github.com/apache/airflow/blob/73f7d891583b023239c73a926cf1fdc69069176b/airflow/utils/task_group.py#L669-L677). Any further workflow that you instance inside that dag (or another dag) will raise an exception with the message `RuntimeError: ('Cannot mix TaskGroups from different DAGs: %s and %s')`. ### What you think should happen instead? I see two possible solutions, one more feasible than the other. 1. move the call to the superclass' exit method at the beginning of this method (not 100% that this will work because maybe the code inside relies on having that context attribute in it) 2. encapsulate the code in a try-finally block, that makes sure the superclass exit method is called no matter what at the end of execution. ### How to reproduce I will add a simple unit test where you can check the issue ```python import pytest from airflow import DAG from airflow.exceptions import AirflowException from airflow.operators.python import PythonOperator from airflow.providers.databricks.operators.databricks_workflow import DatabricksWorkflowTaskGroup from airflow.utils.dates import days_ago from airflow.utils.task_group import TaskGroupContext def test_example(): dag = DAG( dag_id="test_workflows", start_date=days_ago(1), schedule_interval=None, ) workflow = DatabricksWorkflowTaskGroup(databricks_conn_id="default", group_id="tasks", dag=dag) with pytest.raises(AirflowException) as e_info: with workflow: # Force an exception instantiating an operator that is not supported by databricks workflows op = PythonOperator(python_callable=lambda x: x, task_id="foo") assert str(e_info.value) == "Task tasks.foo does not support conversion to databricks workflow task." # Here the test will fail because TaskGroupContext didn't pop the instance group = TaskGroupContext.pop_context_managed_task_group() assert group is None ``` ### Operating System MacOs Sonoma 14.6.1 ### Versions of Apache Airflow Providers apache-airflow-providers-common-compat==1.1.0 apache-airflow-providers-common-io==1.4.0 apache-airflow-providers-common-sql==1.15.0 apache-airflow-providers-databricks==6.8.0 apache-airflow-providers-fab==1.2.2 apache-airflow-providers-ftp==3.10.1 apache-airflow-providers-google==10.9.0 apache-airflow-providers-http==4.12.0 apache-airflow-providers-imap==3.6.1 apache-airflow-providers-smtp==1.7.1 apache-airflow-providers-sqlite==3.8.2 ### Deployment Amazon (AWS) MWAA ### Deployment details _No response_ ### Anything else? _No response_ ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
