This is an automated email from the ASF dual-hosted git repository.

shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bfec6cfaa16 Add Cloud SQL Auth Proxy IAM authentication (#66510)
bfec6cfaa16 is described below

commit bfec6cfaa168c10cb5983b9cabe256bd6118ab7a
Author: Aaron Chen <[email protected]>
AuthorDate: Tue May 12 07:18:55 2026 -0700

    Add Cloud SQL Auth Proxy IAM authentication (#66510)
---
 providers/google/docs/connections/gcp_sql.rst      |  73 +++++++-
 .../providers/google/cloud/hooks/cloud_sql.py      |  39 +++-
 .../cloud_sql/example_cloud_sql_query_proxy_iam.py | 204 +++++++++++++++++++++
 .../unit/google/cloud/hooks/test_cloud_sql.py      | 142 ++++++++++++++
 4 files changed, 444 insertions(+), 14 deletions(-)

diff --git a/providers/google/docs/connections/gcp_sql.rst 
b/providers/google/docs/connections/gcp_sql.rst
index 22efe50d8e1..f0d675b0172 100644
--- a/providers/google/docs/connections/gcp_sql.rst
+++ b/providers/google/docs/connections/gcp_sql.rst
@@ -44,8 +44,9 @@ Schema (optional)
 Login (required)
     Specify the user name to connect.
 
-Password (required)
-    Specify the password to connect.
+Password (required unless IAM authentication is used)
+    Specify the password to connect. Leave it empty when using IAM 
authentication with either
+    ``use_iam`` or ``sql_proxy_enable_iam_login``.
 
 Extra (optional)
     Specify the extra parameters (as JSON dictionary) that can be used in 
Google Cloud SQL
@@ -80,9 +81,16 @@ Extra (optional)
 Configuring and using IAM authentication
 ----------------------------------------
 
+The Google provider supports two IAM authentication paths:
+
+* Direct IAM token authentication with ``use_iam``. Airflow generates a 
database login token and uses
+  it as the database password.
+* Cloud SQL Auth Proxy IAM authentication with ``sql_proxy_enable_iam_login``. 
Airflow starts Cloud SQL
+  Auth Proxy with IAM database authentication enabled and connects with an 
empty password.
+
 .. warning::
-  This functionality requires ``gcloud`` command (Google Cloud SDK) must be 
`installed
-  <https://cloud.google.com/sdk/docs/install>`_ on the Airflow worker.
+  Direct IAM token authentication with ``use_iam`` requires the ``gcloud`` 
command (Google Cloud SDK)
+  to be `installed <https://cloud.google.com/sdk/docs/install>`_ on the 
Airflow worker.
 
 .. warning::
   IAM authentication working only for Google Service Accounts.
@@ -101,11 +109,12 @@ Here are links describing what should be done before the 
start: `PostgreSQL
 <https://cloud.google.com/sql/docs/postgres/iam-logins#before_you_begin>`_ and 
`MySQL
 <https://cloud.google.com/sql/docs/mysql/iam-logins#before_you_begin>`_.
 
-Configure ``gcpcloudsql`` connection with IAM enabling
-""""""""""""""""""""""""""""""""""""""""""""""""""""""
+Configure ``gcpcloudsql`` connection with direct IAM token authentication
+"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
 
 For using IAM you need to enable ``"use_iam": "True"`` in the ``extra`` field. 
And specify IAM account in this format
 ``USERNAME@PROJECT_ID.iam.gserviceaccount.com`` in ``login`` field and empty 
string in the ``password`` field.
+Do not combine ``use_iam`` with ``sql_proxy_enable_iam_login``.
 
 For example:
 
@@ -113,3 +122,55 @@ For example:
     :language: python
     :start-after: [START howto_operator_cloudsql_iam_connections]
     :end-before: [END howto_operator_cloudsql_iam_connections]
+
+Configure ``gcpcloudsql`` connection with Cloud SQL Auth Proxy IAM 
authentication
+"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
+
+For using Cloud SQL Auth Proxy IAM authentication, enable ``"use_proxy": 
"True"`` and
+``"sql_proxy_enable_iam_login": "True"`` in the ``extra`` field. With the 
current Cloud SQL Auth Proxy
+v1 integration this option is supported for both Postgres and MySQL. Airflow 
passes
+``-enable_iam_login`` to the proxy, so the ``password`` field can be empty.
+
+Example "extras" field for Postgres:
+
+.. code-block:: json
+
+   {
+      "database_type": "postgres",
+      "project_id": "example-project",
+      "location": "europe-west1",
+      "instance": "testinstance",
+      "use_proxy": true,
+      "sql_proxy_use_tcp": true,
+      "sql_proxy_enable_iam_login": true
+   }
+
+Example "extras" field for MySQL:
+
+.. code-block:: json
+
+   {
+      "database_type": "mysql",
+      "project_id": "example-project",
+      "location": "europe-west1",
+      "instance": "testinstance",
+      "use_proxy": true,
+      "sql_proxy_use_tcp": true,
+      "sql_proxy_enable_iam_login": true
+   }
+
+.. note::
+   Cloud SQL for MySQL does not grant database-level privileges to IAM 
service-account users
+   automatically when the user is created. After creating the IAM 
service-account user (for example
+   via ``gcloud sql users create <user> --type=cloud_iam_service_account``) a 
database administrator
+   must grant the required privileges using SQL, for example
+   ``GRANT SELECT ON <database>.* TO '<service-account-prefix>'@'%';``. This 
is a Cloud SQL operational
+   step and is outside the scope of Airflow. Cloud SQL for Postgres does not 
have this requirement
+   for the default ``public`` schema.
+
+For example:
+
+.. exampleinclude:: 
/../../google/tests/system/google/cloud/cloud_sql/example_cloud_sql_query_proxy_iam.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_proxy_iam_connections]
+    :end-before: [END howto_operator_cloudsql_proxy_iam_connections]
diff --git 
a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py 
b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
index 9b165a7030b..eda0e341615 100644
--- a/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
+++ b/providers/google/src/airflow/providers/google/cloud/hooks/cloud_sql.py
@@ -528,6 +528,8 @@ class CloudSqlProxyRunner(LoggingMixin):
         project_id: str = PROVIDE_PROJECT_ID,
         sql_proxy_version: str | None = None,
         sql_proxy_binary_path: str | None = None,
+        *,
+        sql_proxy_enable_iam_login: bool = False,
     ) -> None:
         super().__init__()
         self.path_prefix = path_prefix
@@ -540,6 +542,7 @@ class CloudSqlProxyRunner(LoggingMixin):
         self.instance_specification = instance_specification
         self.project_id = project_id
         self.gcp_conn_id = gcp_conn_id
+        self.sql_proxy_enable_iam_login = sql_proxy_enable_iam_login
         self.command_line_parameters: list[str] = []
         self.cloud_sql_proxy_socket_directory = self.path_prefix
         self.sql_proxy_path = sql_proxy_binary_path or 
f"{self.path_prefix}_cloud_sql_proxy"
@@ -549,6 +552,8 @@ class CloudSqlProxyRunner(LoggingMixin):
     def _build_command_line_parameters(self) -> None:
         self.command_line_parameters.extend(["-dir", 
self.cloud_sql_proxy_socket_directory])
         self.command_line_parameters.extend(["-instances", 
self.instance_specification])
+        if self.sql_proxy_enable_iam_login:
+            self.command_line_parameters.append("-enable_iam_login")
 
     @staticmethod
     def _is_os_64bit() -> bool:
@@ -788,6 +793,9 @@ class CloudSQLDatabaseHook(BaseHook):
       You cannot use proxy and SSL together.
     * **use_iam** - (default False) Whether IAM should be used to connect to 
Cloud SQL DB.
       With using IAM password field should be empty string.
+    * **sql_proxy_enable_iam_login** - (default False) Whether Cloud SQL Auth 
Proxy should use
+      IAM database authentication. This requires ``use_proxy`` and is 
supported with the current
+      Cloud SQL Auth Proxy v1 integration for both Postgres and MySQL.
     * **sql_proxy_use_tcp** - (default False) If set to true, TCP is used to 
connect via
       proxy, otherwise UNIX sockets are used.
     * **sql_proxy_version** -  Specific version of the proxy to download (for 
example
@@ -852,15 +860,12 @@ class CloudSQLDatabaseHook(BaseHook):
         self.use_proxy = self._get_bool(self.extras.get("use_proxy", "False"))
         self.use_ssl = self._get_bool(self.extras.get("use_ssl", "False"))
         self.use_iam = self._get_bool(self.extras.get("use_iam", "False"))
+        self.sql_proxy_enable_iam_login = self._get_bool(
+            self.extras.get("sql_proxy_enable_iam_login", "False")
+        )
         self.sql_proxy_use_tcp = 
self._get_bool(self.extras.get("sql_proxy_use_tcp", "False"))
         self.sql_proxy_version = self.extras.get("sql_proxy_version")
         self.sql_proxy_binary_path = sql_proxy_binary_path
-        if self.use_iam:
-            self.user = self._get_iam_db_login()
-            self.password = 
self._generate_login_token(service_account=self.cloudsql_connection.login)
-        else:
-            self.user = cast("str", self.cloudsql_connection.login)
-            self.password = cast("str", self.cloudsql_connection.password)
         self.public_ip = self.cloudsql_connection.host
         self.public_port = self.cloudsql_connection.port
         self.ssl_cert = ssl_cert
@@ -876,7 +881,18 @@ class CloudSQLDatabaseHook(BaseHook):
         # Generated based on clock + clock sequence. Unique per host (!).
         # This is important as different hosts share the database
         self.db_conn_id = str(uuid.uuid1())
+        # Validate before resolving user/password so invalid configs fail fast,
+        # without spawning the gcloud subprocess used by 
``_generate_login_token``.
         self._validate_inputs()
+        if self.use_iam:
+            self.user = self._get_iam_db_login()
+            self.password = 
self._generate_login_token(service_account=self.cloudsql_connection.login)
+        elif self.sql_proxy_enable_iam_login:
+            self.user = self._get_iam_db_login()
+            self.password = self.cloudsql_connection.password or ""
+        else:
+            self.user = cast("str", self.cloudsql_connection.login)
+            self.password = cast("str", self.cloudsql_connection.password)
 
     @property
     def sslcert(self) -> str | None:
@@ -989,6 +1005,12 @@ class CloudSQLDatabaseHook(BaseHook):
                 " SSL is not needed as Cloud SQL Proxy "
                 "provides encryption on its own"
             )
+        if self.use_iam and self.sql_proxy_enable_iam_login:
+            raise ValueError(
+                "use_iam (direct IAM token) and sql_proxy_enable_iam_login 
(proxy IAM) are mutually exclusive"
+            )
+        if self.sql_proxy_enable_iam_login and not self.use_proxy:
+            raise ValueError("sql_proxy_enable_iam_login requires use_proxy to 
be True")
         if any([self.ssl_key, self.ssl_cert, self.ssl_root_cert]) and 
self.ssl_secret_id:
             raise AirflowException(
                 "Invalid SSL settings. Please use either all of parameters 
['ssl_cert', 'ssl_cert', "
@@ -1073,7 +1095,7 @@ class CloudSQLDatabaseHook(BaseHook):
             raise AirflowException("The login parameter needs to be set in 
connection")
         if not self.public_ip:
             raise AirflowException("The host parameter needs to be set in 
connection")
-        if not self.password:
+        if not self.password and not self.sql_proxy_enable_iam_login:
             raise AirflowException("The password parameter needs to be set in 
connection")
         if not self.database:
             raise AirflowException("The database parameter needs to be set in 
connection")
@@ -1136,7 +1158,7 @@ class CloudSQLDatabaseHook(BaseHook):
             raise AirflowException("The login parameter needs to be set in 
connection")
         if not self.public_ip:
             raise AirflowException("The host parameter needs to be set in 
connection")
-        if not self.password:
+        if not self.password and not self.sql_proxy_enable_iam_login:
             raise AirflowException("The password parameter needs to be set in 
connection")
         if not self.database:
             raise AirflowException("The database parameter needs to be set in 
connection")
@@ -1227,6 +1249,7 @@ class CloudSQLDatabaseHook(BaseHook):
             sql_proxy_version=self.sql_proxy_version,
             sql_proxy_binary_path=self.sql_proxy_binary_path,
             gcp_conn_id=self.gcp_conn_id,
+            sql_proxy_enable_iam_login=self.sql_proxy_enable_iam_login,
         )
 
     def get_database_hook(self, connection: Connection) -> DbApiHook:
diff --git 
a/providers/google/tests/system/google/cloud/cloud_sql/example_cloud_sql_query_proxy_iam.py
 
b/providers/google/tests/system/google/cloud/cloud_sql/example_cloud_sql_query_proxy_iam.py
new file mode 100644
index 00000000000..9ecb597119a
--- /dev/null
+++ 
b/providers/google/tests/system/google/cloud/cloud_sql/example_cloud_sql_query_proxy_iam.py
@@ -0,0 +1,204 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Example Airflow Dag that performs a query in a Postgres Cloud SQL instance 
with proxy IAM authentication.
+"""
+
+from __future__ import annotations
+
+import json
+import os
+from copy import deepcopy
+from datetime import datetime
+from typing import Any
+
+from googleapiclient import discovery
+
+from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS
+
+if AIRFLOW_V_3_0_PLUS:
+    from airflow.sdk import task
+else:
+    # Airflow 2 path
+    from airflow.decorators import task  # type: ignore[attr-defined,no-redef]
+from airflow.models.dag import DAG
+from airflow.providers.google.cloud.operators.cloud_sql import (
+    CloudSQLCreateInstanceDatabaseOperator,
+    CloudSQLCreateInstanceOperator,
+    CloudSQLDeleteInstanceOperator,
+    CloudSQLExecuteQueryOperator,
+)
+
+try:
+    from airflow.sdk import TriggerRule
+except ImportError:
+    # Compatibility for Airflow < 3.1
+    from airflow.utils.trigger_rule import TriggerRule  # type: 
ignore[no-redef,attr-defined]
+
+from system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+from system.google.gcp_api_client_helpers import create_airflow_connection, 
delete_airflow_connection
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or 
DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID
+DAG_ID = "cloudsql_query_proxy_iam"
+REGION = "us-central1"
+IS_COMPOSER = bool(os.environ.get("COMPOSER_ENVIRONMENT", ""))
+
+CLOUD_SQL_INSTANCE_NAME = f"{ENV_ID}-{DAG_ID}-postgres".replace("_", "-")
+CLOUD_SQL_DATABASE_NAME = "test_db"
+CLOUD_IAM_SA = os.environ.get("SYSTEM_TESTS_CLOUDSQL_SA", "test_iam_sa")
+CLOUD_SQL_IAM_SA = CLOUD_IAM_SA.split(".gserviceaccount.com")[0]
+CLOUD_SQL_IP_ADDRESS = "127.0.0.1"
+CLOUD_SQL_PUBLIC_PORT = 5432
+CONNECTION_PROXY_IAM_ID = f"{DAG_ID}_{ENV_ID}_proxy_iam"
+
+CLOUD_SQL_INSTANCE_CREATE_BODY: dict[str, Any] = {
+    "name": CLOUD_SQL_INSTANCE_NAME,
+    "settings": {
+        "tier": "db-custom-1-3840",
+        "dataDiskSizeGb": 30,
+        "pricingPlan": "PER_USE",
+        "ipConfiguration": {"ipv4Enabled": True},
+        "databaseFlags": [{"name": "cloudsql.iam_authentication", "value": 
"on"}],
+    },
+    "databaseVersion": "POSTGRES_15",
+    "region": REGION,
+}
+
+# [START howto_operator_cloudsql_proxy_iam_connections]
+CONNECTION_WITH_PROXY_IAM_KWARGS = {
+    "conn_type": "gcpcloudsql",
+    "login": CLOUD_IAM_SA,
+    "password": "",
+    "host": CLOUD_SQL_IP_ADDRESS,
+    "port": CLOUD_SQL_PUBLIC_PORT,
+    "schema": CLOUD_SQL_DATABASE_NAME,
+    "extra": {
+        "database_type": "postgres",
+        "project_id": PROJECT_ID,
+        "location": REGION,
+        "instance": CLOUD_SQL_INSTANCE_NAME,
+        "use_proxy": "True",
+        "sql_proxy_use_tcp": "True",
+        "sql_proxy_enable_iam_login": "True",
+    },
+}
+# [END howto_operator_cloudsql_proxy_iam_connections]
+
+
+def cloud_sql_database_create_body(instance: str) -> dict[str, Any]:
+    """Generates a Cloud SQL database creation body."""
+    return {
+        "instance": instance,
+        "name": CLOUD_SQL_DATABASE_NAME,
+        "project": PROJECT_ID,
+    }
+
+
+with DAG(
+    dag_id=DAG_ID,
+    start_date=datetime(2026, 1, 1),
+    schedule="@once",
+    catchup=False,
+    tags=["example", "cloudsql", "postgres"],
+) as dag:
+    create_cloud_sql_instance = CloudSQLCreateInstanceOperator(
+        task_id="create_cloud_sql_instance_postgres",
+        project_id=PROJECT_ID,
+        instance=CLOUD_SQL_INSTANCE_NAME,
+        body=CLOUD_SQL_INSTANCE_CREATE_BODY,
+    )
+
+    create_database = CloudSQLCreateInstanceDatabaseOperator(
+        task_id="create_database_postgres",
+        body=cloud_sql_database_create_body(instance=CLOUD_SQL_INSTANCE_NAME),
+        instance=CLOUD_SQL_INSTANCE_NAME,
+    )
+
+    @task(task_id="create_user_postgres")
+    def create_user(instance: str, service_account: str) -> None:
+        with discovery.build("sqladmin", "v1beta4") as service:
+            request = service.users().insert(
+                project=PROJECT_ID,
+                instance=instance,
+                body={
+                    "name": service_account,
+                    "type": "CLOUD_IAM_SERVICE_ACCOUNT",
+                },
+            )
+            request.execute()
+
+    create_user_task = create_user(instance=CLOUD_SQL_INSTANCE_NAME, 
service_account=CLOUD_SQL_IAM_SA)
+
+    @task(task_id="create_connection_postgres")
+    def create_connection(connection_id: str, instance: str) -> str:
+        connection: dict[str, Any] = deepcopy(CONNECTION_WITH_PROXY_IAM_KWARGS)
+        connection["extra"]["instance"] = instance
+        connection["extra"] = json.dumps(connection["extra"])
+        create_airflow_connection(
+            connection_id=connection_id, connection_conf=connection, 
is_composer=IS_COMPOSER
+        )
+        return connection_id
+
+    create_connection_task = create_connection(
+        connection_id=CONNECTION_PROXY_IAM_ID,
+        instance=CLOUD_SQL_INSTANCE_NAME,
+    )
+
+    query_task = CloudSQLExecuteQueryOperator(
+        gcp_cloudsql_conn_id=CONNECTION_PROXY_IAM_ID,
+        task_id="example_cloud_sql_query_proxy_iam_postgres",
+        sql=["SELECT 1"],
+    )
+
+    delete_instance = CloudSQLDeleteInstanceOperator(
+        task_id="delete_cloud_sql_instance_postgres",
+        project_id=PROJECT_ID,
+        instance=CLOUD_SQL_INSTANCE_NAME,
+        trigger_rule=TriggerRule.ALL_DONE,
+    )
+
+    @task(task_id="delete_connection_postgres")
+    def delete_connection(connection_id: str) -> None:
+        delete_airflow_connection(connection_id=connection_id, 
is_composer=IS_COMPOSER)
+
+    delete_connection_task = 
delete_connection(connection_id=CONNECTION_PROXY_IAM_ID)
+
+    (
+        # TEST SETUP
+        create_cloud_sql_instance
+        >> [create_database, create_user_task]
+        >> create_connection_task
+        # TEST BODY
+        >> query_task
+        # TEST TEARDOWN
+        >> [delete_instance, delete_connection_task]
+    )
+
+    # ### Everything below this line is not part of example ###
+    # ### Just for system tests purpose ###
+    from tests_common.test_utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "tearDown" task with trigger rule is part of the Dag
+    list(dag.tasks) >> watcher()
+
+from tests_common.test_utils.system_tests import get_test_run  # noqa: E402
+
+# Needed to run the example Dag with pytest (see: 
contributing-docs/testing/system_tests.rst)
+test_run = get_test_run(dag)
diff --git a/providers/google/tests/unit/google/cloud/hooks/test_cloud_sql.py 
b/providers/google/tests/unit/google/cloud/hooks/test_cloud_sql.py
index 5f23068c824..ccaa3c010f2 100644
--- a/providers/google/tests/unit/google/cloud/hooks/test_cloud_sql.py
+++ b/providers/google/tests/unit/google/cloud/hooks/test_cloud_sql.py
@@ -787,6 +787,12 @@ def _parse_from_uri(uri: str):
     return connection_parameters
 
 
+def _connection_from_uri(uri: str):
+    if AIRFLOW_V_3_1_PLUS:
+        return Connection(conn_id="test_conn_id", **_parse_from_uri(uri))
+    return Connection(uri=uri)  # type: ignore[call-arg]
+
+
 class TestCloudSqlDatabaseHook:
     
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection")
     def test_cloudsql_database_hook_validate_ssl_certs_no_ssl(self, 
get_connection):
@@ -1639,6 +1645,106 @@ class TestCloudSqlDatabaseQueryHook:
         assert connection.port != 3200
         assert connection.schema == "testdb"
 
+    
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection")
+    def test_hook_with_proxy_iam_postgres_tcp(self, get_connection):
+        uri = (
+            
"gcpcloudsql://service-account%40project.iam.gserviceaccount.com:@127.0.0.1:5432/"
+            
"testdb?database_type=postgres&project_id=example-project&location=europe-west1&"
+            
"instance=testdb&use_proxy=True&sql_proxy_use_tcp=True&sql_proxy_enable_iam_login=True"
+        )
+        get_connection.side_effect = [_connection_from_uri(uri)]
+        with mock.patch(
+            
"airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook._generate_login_token"
+        ) as generate_login_token:
+            hook = CloudSQLDatabaseHook()
+            connection = hook.create_connection()
+
+        assert connection.conn_type == "postgres"
+        assert connection.login == "[email protected]"
+        assert connection.password in ("", None)
+        assert connection.host == "127.0.0.1"
+        assert connection.port != 5432
+        assert connection.schema == "testdb"
+        generate_login_token.assert_not_called()
+
+        sqlproxy_runner = hook.get_sqlproxy_runner()
+        assert sqlproxy_runner.sql_proxy_enable_iam_login is True
+        assert "-enable_iam_login" in sqlproxy_runner.command_line_parameters
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection")
+    def test_hook_with_proxy_iam_generates_uri_with_empty_password(self, 
get_connection):
+        uri = (
+            
"gcpcloudsql://service-account%40project.iam.gserviceaccount.com:@127.0.0.1:5432/"
+            
"testdb?database_type=postgres&project_id=example-project&location=europe-west1&"
+            
"instance=testdb&use_proxy=True&sql_proxy_use_tcp=True&sql_proxy_enable_iam_login=True"
+        )
+        get_connection.side_effect = [_connection_from_uri(uri)]
+        hook = CloudSQLDatabaseHook()
+
+        connection_uri = hook._generate_connection_uri()
+
+        assert 
connection_uri.startswith("postgresql://service-account%40project.iam:@127.0.0.1:")
+        assert ":@" in connection_uri
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection")
+    def test_hook_with_proxy_iam_mutually_exclusive_with_use_iam(self, 
get_connection):
+        uri = (
+            
"gcpcloudsql://service-account%40project.iam.gserviceaccount.com:@127.0.0.1:5432/"
+            
"testdb?database_type=postgres&project_id=example-project&location=europe-west1&"
+            
"instance=testdb&use_proxy=True&sql_proxy_use_tcp=True&use_iam=True&"
+            "sql_proxy_enable_iam_login=True"
+        )
+        get_connection.side_effect = [_connection_from_uri(uri)]
+
+        with mock.patch(
+            
"airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook._generate_login_token"
+        ):
+            with pytest.raises(ValueError, match="mutually exclusive"):
+                CloudSQLDatabaseHook()
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection")
+    def test_hook_with_proxy_iam_requires_use_proxy(self, get_connection):
+        uri = (
+            
"gcpcloudsql://service-account%40project.iam.gserviceaccount.com:@127.0.0.1:5432/"
+            
"testdb?database_type=postgres&project_id=example-project&location=europe-west1&"
+            "instance=testdb&use_proxy=False&sql_proxy_enable_iam_login=True"
+        )
+        get_connection.side_effect = [_connection_from_uri(uri)]
+
+        with mock.patch(
+            
"airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook._generate_login_token"
+        ) as generate_login_token:
+            with pytest.raises(ValueError, match="requires use_proxy to be 
True"):
+                CloudSQLDatabaseHook()
+
+        generate_login_token.assert_not_called()
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection")
+    def test_hook_with_proxy_iam_mysql_tcp(self, get_connection):
+        uri = (
+            
"gcpcloudsql://service-account%40project.iam.gserviceaccount.com:@127.0.0.1:3306/"
+            
"testdb?database_type=mysql&project_id=example-project&location=europe-west1&"
+            
"instance=testdb&use_proxy=True&sql_proxy_use_tcp=True&sql_proxy_enable_iam_login=True"
+        )
+        get_connection.side_effect = [_connection_from_uri(uri)]
+        with mock.patch(
+            
"airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook._generate_login_token"
+        ) as generate_login_token:
+            hook = CloudSQLDatabaseHook()
+            connection = hook.create_connection()
+
+        assert connection.conn_type == "mysql"
+        assert connection.login == "service-account"
+        assert connection.password in ("", None)
+        assert connection.host == "127.0.0.1"
+        assert connection.port != 3306
+        assert connection.schema == "testdb"
+        generate_login_token.assert_not_called()
+
+        sqlproxy_runner = hook.get_sqlproxy_runner()
+        assert sqlproxy_runner.sql_proxy_enable_iam_login is True
+        assert "-enable_iam_login" in sqlproxy_runner.command_line_parameters
+
     
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook.get_connection")
     def test_hook_with_correct_parameters_mysql(self, get_connection):
         uri = (
@@ -1756,6 +1862,42 @@ class TestCloudSqlProxyRunner:
         with pytest.raises(ValueError, match="The sql_proxy_version should 
match the regular expression"):
             runner._get_sql_proxy_download_url()
 
+    def test_cloud_sql_proxy_runner_adds_enable_iam_login_flag(self):
+        runner = CloudSqlProxyRunner(
+            path_prefix="12345678",
+            instance_specification="project:us-east-1:instance",
+            sql_proxy_enable_iam_login=True,
+        )
+
+        assert "-enable_iam_login" in runner.command_line_parameters
+
+    def 
test_cloud_sql_proxy_runner_does_not_add_enable_iam_login_by_default(self):
+        runner = CloudSqlProxyRunner(
+            path_prefix="12345678",
+            instance_specification="project:us-east-1:instance",
+        )
+
+        assert "-enable_iam_login" not in runner.command_line_parameters
+
+    
@mock.patch("airflow.providers.google.cloud.hooks.cloud_sql.GoogleBaseHook.get_connection")
+    def 
test_cloud_sql_proxy_runner_keeps_key_path_credentials_with_iam_login(self, 
get_connection):
+        connection = Connection(conn_id="google_conn", 
conn_type="google_cloud_platform")
+        if AIRFLOW_V_3_1_PLUS:
+            connection.extra = json.dumps({"key_path": "/tmp/key.json"})
+        else:
+            connection.set_extra(json.dumps({"key_path": "/tmp/key.json"}))
+        get_connection.return_value = connection
+        runner = CloudSqlProxyRunner(
+            path_prefix="12345678",
+            # Non-empty instance specification avoids adding -projects for 
forwarding all instances.
+            instance_specification="project:us-east-1:instance",
+            gcp_conn_id="google_conn",
+            sql_proxy_enable_iam_login=True,
+        )
+
+        assert runner._get_credential_parameters() == ["-credential_file", 
"/tmp/key.json"]
+        assert "-enable_iam_login" in runner.command_line_parameters
+
 
 class TestCloudSQLAsyncHook:
     @pytest.mark.asyncio

Reply via email to