albertwangnz opened a new issue, #51910:
URL: https://github.com/apache/airflow/issues/51910

   ### Apache Airflow Provider(s)
   
   databricks
   
   ### Versions of Apache Airflow Providers
   
   7.4.0
   
   ### Apache Airflow version
   
   3.0.2
   
   ### Operating System
   
   Ubuntu 24.04.2 LTS
   
   ### Deployment
   
   Virtualenv installation
   
   ### Deployment details
   
   ### **Deployment Details**
   
   - **Deployment Type**: Virtualenv installation
   - **Operating System**: Ubuntu 24.04.2 LTS
   - **Python Version**: 3.12.3
   - **Airflow Version**: 3.0.2
   - **Databricks Provider Version**: 7.4.0
   - **Database Backend**: PostgreSQL 16
   - **Secrets Backend**: Microsoft Azure Key Vault
   - **Authentication**: Flask AppBuilder (FAB) with Microsoft Entra ID (SSO)
   - **SSL Configuration**: Enabled with custom certificates
   - **Timezone**: Pacific/Auckland
   - **Airflow Services Management**: systemd unit files for `api-server`, 
`scheduler`, `dag-processor`, and `triggerer`
   - **Custom Configuration Highlights**:
     - Airflow configuration (`airflow.cfg`) includes:
       - `sql_alchemy_conn_secret` for DB connection string
       - Azure Key Vault integration for secrets
       - SSL cert/key paths
       - FAB auth manager
     - Environment variables for Azure credentials (`AZURE_CLIENT_ID`, 
`AZURE_TENANT_ID`, `AZURE_CLIENT_SECRET`)
     - Custom `webserver_config.py` for SSO
     - Firewall configured to allow port 8443
   
   Installation followed a manual setup process using a Python virtual 
environment and systemd for service orchestration. All dependencies were 
installed using pip with Airflow constraints for version compatibility.
   
   ### What happened
   
   ### **What happened**
   
   While running a DAG using `DatabricksRunNowOperator` with `deferrable=True` 
in Airflow 3.0.2 and Databricks Provider 7.4.0, the task successfully triggered 
a Databricks job and completed with a `SUCCESS` state. However, during the 
execution, the Airflow logs showed repeated warnings and errors related to 
unclosed `aiohttp` client sessions and connectors:
   
   ```
   [2025-06-19, 02:21:23] ERROR - Unclosed client session
   client_session: <aiohttp.client.ClientSession object at 0x7784a33f1250>: 
source="asyncio"
   [2025-06-19, 02:21:23] ERROR - Unclosed connector
   connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 
0x7784a33c6990>, 341232.543125349)])']
   connector: <aiohttp.connector.TCPConnector object at 0x7784a33f1880>: 
source="asyncio"
   ```
   
   These errors appeared shortly after the task was deferred and while the 
`DatabricksExecutionTrigger` was polling the job status. Despite the job 
completing successfully, the presence of these warnings suggests a potential 
resource leak or improper cleanup of async HTTP sessions in the provider code.
   
   This issue occurred in a production-like environment using:
   - Airflow 3.0.2 (virtualenv installation)
   - Python 3.12.3
   - Ubuntu 24.04.2 LTS
   - Databricks Provider 7.4.0
   - PostgreSQL 16 as backend
   - Azure Key Vault as secrets backend
   
   No custom modifications were made to the operator or trigger logic. The DAG 
was manually triggered and ran without retries.
   
   ### What you think should happen instead
   
   ### **What you think should happen instead**
   
   The task completes successfully, but the error messages in the logs 
indicates improper resource management in the Databricks provider:
   
   ```
   [2025-06-19, 02:21:23] ERROR - Unclosed client session
   client_session: <aiohttp.client.ClientSession object at 0x7784a33f1250>: 
source="asyncio"
   [2025-06-19, 02:21:23] ERROR - Unclosed connector
   connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 
0x7784a33c6990>, 341232.543125349)])']
   connector: <aiohttp.connector.TCPConnector object at 0x7784a33f1880>: 
source="asyncio"
   ```
   
   These errors suggest that the `aiohttp.ClientSession` and `TCPConnector` 
objects used during the async polling in `DatabricksExecutionTrigger` are not 
being properly closed or garbage collected. This could lead to memory leaks or 
degraded performance over time, especially in long-running Airflow environments.
   
   Since the operator is marked as `deferrable=True`, it should follow best 
practices for async resource cleanup. I expect that:
   - All async HTTP sessions and connectors are properly closed after use.
   - No warnings or errors related to unclosed resources appear in the logs.
   - The operator and trigger behave cleanly and efficiently without leaving 
behind dangling resources.
   
   This behavior is erroneous because it violates expected resource lifecycle 
management in asynchronous Python code and could impact system stability.
   
   ### How to reproduce
   
   ### **How to reproduce**
   
   To reproduce the issue, follow these steps in a clean Airflow 3.0.2 
environment with the Databricks Provider version 7.4.0:
   
   1. **Set up Airflow**:
      - Install Airflow 3.0.2 in a Python 3.12 virtual environment.
      - Use PostgreSQL 16 as the metadata database.
      - Configure Airflow to use Microsoft Azure Key Vault as the secrets 
backend.
      - Enable SSL and set up Flask AppBuilder (FAB) authentication.
   
   2. **Create a DAG** with the following characteristics:
      - Uses `DatabricksRunNowOperator`
      - Sets `deferrable=True`
      - Specifies a valid `job_id` for a Databricks job
      - Uses a valid `databricks_conn_id` pointing to an Azure Databricks 
workspace
   
     My DAG snippet:
   ```python
   from datetime import datetime, timedelta
   
   from airflow.models import DAG
   
   from airflow.providers.databricks.operators.databricks import (
       DatabricksRunNowOperator,
   )
   
   with DAG(
       dag_id="sit_trigger_databricks_job_run",
       description="System Integration Testing - Trigger a Databricks job run",
       start_date=datetime.now() - timedelta(days=1),
       schedule=None,  # timedelta(days=1),
       catchup=False,
       default_args={
           "retries": 0,
           "retry_delay": timedelta(minutes=1),
           "databricks_conn_id": "databricks-default2",
       },
       tags=["sit", "databricks"],
   ) as dag:
   
       trigger_databricks_job_run = DatabricksRunNowOperator(
           task_id="trigger_databricks_job_run",
           job_id=604918372746183,
           deferrable=True,
           execution_timeout=timedelta(hours=3),
       )
   ```
   
   3. **Trigger the DAG manually** via the Airflow UI or CLI.
   
   4. **Observe the logs** of the task. You will see the following error 
messages even though the job completes successfully:
      ```
      ERROR - Unclosed client session
      ERROR - Unclosed connector
      ```
   
   This issue is reproducible consistently in environments using async 
deferrable operators with the Databricks provider.
   
   ### Anything else
   
   ### **Anything else**
   
   This issue occurs **every time** the DAG is triggered using 
`DatabricksRunNowOperator` with `deferrable=True`. The error messages 
consistently appear during the polling phase managed by 
`DatabricksExecutionTrigger`.
   
   Here are the relevant full log:
   
   ```log
   [2025-06-19, 02:21:16] INFO - DAG bundles loaded: dags-folder: 
source="airflow.dag_processing.bundles.manager.DagBundlesManager"
   [2025-06-19, 02:21:16] INFO - Filling up the DagBag from 
/home/airflow/airflow/dags/sit_trigger_databricks_job_run.py: 
source="airflow.models.dagbag.DagBag"
   [2025-06-19, 02:21:16] INFO - Environment is configured for 
ClientSecretCredential: source="azure.identity._credentials.environment"
   [2025-06-19, 02:21:16] INFO - ManagedIdentityCredential will use IMDS with 
client_id: 3a9f1c84-7b3d-4e2a-a9d1-8f6c3e2b7f90: 
source="azure.identity._credentials.managed_identity"
   [2025-06-19, 02:21:17] INFO - DefaultAzureCredential acquired a token from 
EnvironmentCredential: source="azure.identity._credentials.chained"
   [2025-06-19, 02:21:17] INFO - Connection Retrieved 'databricks-default2': 
source="airflow.hooks.base"
   [2025-06-19, 02:21:17] INFO - Existing AAD token is expired, or going to 
expire soon. Refreshing...: 
source="airflow.task.hooks.airflow.providers.databricks.hooks.databricks.DatabricksHook"
   [2025-06-19, 02:21:18] INFO - ClientSecretCredential.get_token succeeded: 
source="azure.identity._internal.get_token_mixin"
   [2025-06-19, 02:21:19] INFO - Run submitted with run_id: 748193847562019: 
source="airflow.task.operators.airflow.providers.databricks.operators.databricks.DatabricksRunNowOperator"
   [2025-06-19, 02:21:19] INFO - View run status, Spark UI, and logs at 
https://adb-8274910385627419.47.azuredatabricks.net/?o=9182736450192837#job/604918372746183/run/748193847562019:
 
source="airflow.task.operators.airflow.providers.databricks.operators.databricks.DatabricksRunNowOperator"
   [2025-06-19, 02:21:20] INFO - Pausing task as DEFERRED. : 
dag_id="sit_trigger_databricks_job_run": task_id="trigger_databricks_job_run": 
run_id="manual__2025-06-19T02:21:11.927855+00:00": source="task"
   [2025-06-19, 02:21:20] INFO - trigger 
sit_trigger_databricks_job_run/manual__2025-06-19T02:21:11.927855+00:00/trigger_databricks_job_run/-1/1
 (ID 15) starting
   [2025-06-19, 02:21:20] INFO - Environment is configured for 
ClientSecretCredential: source="azure.identity._credentials.environment"
   [2025-06-19, 02:21:20] INFO - ManagedIdentityCredential will use IMDS with 
client_id: 3a9f1c84-7b3d-4e2a-a9d1-8f6c3e2b7f90: 
source="azure.identity._credentials.managed_identity"
   [2025-06-19, 02:21:21] INFO - DefaultAzureCredential acquired a token from 
EnvironmentCredential: source="azure.identity._credentials.chained"
   [2025-06-19, 02:21:23] INFO - Connection Retrieved 'databricks-default2': 
source="airflow.hooks.base"
   [2025-06-19, 02:21:23] INFO - Existing AAD token is expired, or going to 
expire soon. Refreshing...: 
source="airflow.task.hooks.airflow.providers.databricks.hooks.databricks.DatabricksHook"
   [2025-06-19, 02:21:23] INFO - ClientSecretCredential.get_token succeeded: 
source="azure.identity.aio._internal.get_token_mixin"
   [2025-06-19, 02:21:23] ERROR - Unclosed client session
   client_session: <aiohttp.client.ClientSession object at 0x7784a33f1250>: 
source="asyncio"
   [2025-06-19, 02:21:23] ERROR - Unclosed connector
   connections: ['deque([(<aiohttp.client_proto.ResponseHandler object at 
0x7784a33c6990>, 341232.543125349)])']
   connector: <aiohttp.connector.TCPConnector object at 0x7784a33f1880>: 
source="asyncio"
   [2025-06-19, 02:21:23] INFO - run-id 748193847562019 in run state 
{'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. 
sleeping for 30 seconds: 
source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
   [2025-06-19, 02:21:54] INFO - run-id 748193847562019 in run state 
{'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. 
sleeping for 30 seconds: 
source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
   [2025-06-19, 02:22:24] INFO - run-id 748193847562019 in run state 
{'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. 
sleeping for 30 seconds: 
source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
   [2025-06-19, 02:22:54] INFO - run-id 748193847562019 in run state 
{'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. 
sleeping for 30 seconds: 
source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
   [2025-06-19, 02:23:25] INFO - run-id 748193847562019 in run state 
{'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. 
sleeping for 30 seconds: 
source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
   [2025-06-19, 02:23:55] INFO - run-id 748193847562019 in run state 
{'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. 
sleeping for 30 seconds: 
source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
   [2025-06-19, 02:24:26] INFO - run-id 748193847562019 in run state 
{'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. 
sleeping for 30 seconds: 
source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
   [2025-06-19, 02:24:56] INFO - run-id 748193847562019 in run state 
{'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. 
sleeping for 30 seconds: 
source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
   [2025-06-19, 02:25:26] INFO - run-id 748193847562019 in run state 
{'life_cycle_state': 'RUNNING', 'result_state': '', 'state_message': ''}. 
sleeping for 30 seconds: 
source="airflow.providers.databricks.triggers.databricks.DatabricksExecutionTrigger"
   [2025-06-19, 02:25:56] INFO - Trigger fired event: 
name="sit_trigger_databricks_job_run/manual__2025-06-19T02:21:11.927855+00:00/trigger_databricks_job_run/-1/1
 (ID 15)": result="TriggerEvent<{'run_id': 748193847562019, 'run_page_url': 
'https://adb-8274910385627419.47.azuredatabricks.net/?o=9182736450192837#job/604918372746183/run/748193847562019',
 'run_state': '{\"life_cycle_state\": \"TERMINATED\", \"result_state\": 
\"SUCCESS\", \"state_message\": \"\"}', 'repair_run': False, 'errors': []}>"
   ```
   
   These errors appear shortly after the task is deferred and while the trigger 
is actively polling the Databricks job status. The job itself completes 
successfully, but the logs indicate a potential issue with async resource 
cleanup.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to