This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch aws-iam
in repository https://gitbox.apache.org/repos/asf/superset.git

commit af16bd9a19884e7e93ce1c35a5d2af1c62a07a77
Author: Beto Dealmeida <[email protected]>
AuthorDate: Mon Jan 26 10:41:15 2026 -0500

    Provisioned Redshift clusters
---
 superset/db_engine_specs/aws_iam.py                | 153 +++++++++++--
 tests/unit_tests/db_engine_specs/test_aws_iam.py   | 247 ++++++++++++++++++++-
 .../db_engine_specs/test_redshift_iam.py           | 137 ++++++++++++
 3 files changed, 518 insertions(+), 19 deletions(-)

diff --git a/superset/db_engine_specs/aws_iam.py 
b/superset/db_engine_specs/aws_iam.py
index 29145b811a5..cdd597af741 100644
--- a/superset/db_engine_specs/aws_iam.py
+++ b/superset/db_engine_specs/aws_iam.py
@@ -70,6 +70,8 @@ class AWSIAMConfig(TypedDict, total=False):
     # Redshift Serverless fields
     workgroup_name: str
     db_name: str
+    # Redshift provisioned cluster fields
+    cluster_identifier: str
 
 
 class AWSIAMAuthMixin:
@@ -318,7 +320,68 @@ class AWSIAMAuthMixin:
         except ClientError as ex:
             raise SupersetSecurityException(
                 SupersetError(
-                    message=f"Failed to get Redshift credentials: {ex}",
+                    message=f"Failed to get Redshift Serverless credentials: 
{ex}",
+                    
error_type=SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
+                    level=ErrorLevel.ERROR,
+                )
+            ) from ex
+
+    @classmethod
+    def generate_redshift_cluster_credentials(
+        cls,
+        credentials: dict[str, Any],
+        cluster_identifier: str,
+        db_user: str,
+        db_name: str,
+        region: str,
+        auto_create: bool = False,
+    ) -> tuple[str, str]:
+        """
+        Generate credentials for a provisioned Redshift cluster using temporary
+        STS credentials.
+
+        :param credentials: STS credentials from assume_role
+        :param cluster_identifier: Redshift cluster identifier
+        :param db_user: Database username to get credentials for
+        :param db_name: Redshift database name
+        :param region: AWS region
+        :param auto_create: Whether to auto-create the database user if it 
doesn't exist
+        :returns: Tuple of (username, password) for Redshift connection
+        :raises SupersetSecurityException: If credential generation fails
+        """
+        try:
+            import boto3
+            from botocore.exceptions import ClientError
+        except ImportError as ex:
+            raise SupersetSecurityException(
+                SupersetError(
+                    message="boto3 is required for AWS IAM authentication.",
+                    error_type=SupersetErrorType.GENERIC_DB_ENGINE_ERROR,
+                    level=ErrorLevel.ERROR,
+                )
+            ) from ex
+
+        try:
+            client = boto3.client(
+                "redshift",
+                region_name=region,
+                aws_access_key_id=credentials["AccessKeyId"],
+                aws_secret_access_key=credentials["SecretAccessKey"],
+                aws_session_token=credentials["SessionToken"],
+            )
+
+            response = client.get_cluster_credentials(
+                ClusterIdentifier=cluster_identifier,
+                DbUser=db_user,
+                DbName=db_name,
+                AutoCreate=auto_create,
+            )
+            return response["DbUser"], response["DbPassword"]
+
+        except ClientError as ex:
+            raise SupersetSecurityException(
+                SupersetError(
+                    message=f"Failed to get Redshift cluster credentials: 
{ex}",
                     
error_type=SupersetErrorType.CONNECTION_ACCESS_DENIED_ERROR,
                     level=ErrorLevel.ERROR,
                 )
@@ -441,7 +504,11 @@ class AWSIAMAuthMixin:
         iam_config: AWSIAMConfig,
     ) -> None:
         """
-        Apply Redshift Serverless IAM authentication to connection parameters.
+        Apply Redshift IAM authentication to connection parameters.
+
+        Supports both Redshift Serverless (workgroup_name) and provisioned
+        clusters (cluster_identifier). The method auto-detects which type
+        based on the configuration provided.
 
         Flow: assume role -> get Redshift credentials -> update connect_args 
-> SSL.
 
@@ -455,20 +522,56 @@ class AWSIAMAuthMixin:
         region = iam_config.get("region")
         external_id = iam_config.get("external_id")
         session_duration = iam_config.get("session_duration", 
DEFAULT_SESSION_DURATION)
+
+        # Serverless fields
         workgroup_name = iam_config.get("workgroup_name")
+
+        # Provisioned cluster fields
+        cluster_identifier = iam_config.get("cluster_identifier")
+        db_username = iam_config.get("db_username")
+
+        # Common field
         db_name = iam_config.get("db_name")
 
-        # Validate required fields
+        # Determine deployment type
+        is_serverless = bool(workgroup_name)
+        is_provisioned = bool(cluster_identifier)
+
+        if is_serverless and is_provisioned:
+            raise SupersetSecurityException(
+                SupersetError(
+                    message="AWS IAM configuration cannot have both 
workgroup_name "
+                    "(Serverless) and cluster_identifier (provisioned). "
+                    "Please specify only one.",
+                    
error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,
+                    level=ErrorLevel.ERROR,
+                )
+            )
+
+        if not is_serverless and not is_provisioned:
+            raise SupersetSecurityException(
+                SupersetError(
+                    message="AWS IAM configuration must include either 
workgroup_name "
+                    "(for Redshift Serverless) or cluster_identifier "
+                    "(for provisioned Redshift clusters).",
+                    
error_type=SupersetErrorType.CONNECTION_MISSING_PARAMETERS_ERROR,
+                    level=ErrorLevel.ERROR,
+                )
+            )
+
+        # Validate common required fields
         missing_fields = []
         if not role_arn:
             missing_fields.append("role_arn")
         if not region:
             missing_fields.append("region")
-        if not workgroup_name:
-            missing_fields.append("workgroup_name")
         if not db_name:
             missing_fields.append("db_name")
 
+        # Validate provisioned cluster specific fields
+        if is_provisioned and not db_username:
+            missing_fields.append("db_username")
+
         if missing_fields:
             raise SupersetSecurityException(
                 SupersetError(
@@ -482,14 +585,8 @@ class AWSIAMAuthMixin:
         # Type assertions after validation
         assert role_arn is not None
         assert region is not None
-        assert workgroup_name is not None
         assert db_name is not None
 
-        logger.debug(
-            "Applying Redshift IAM authentication for workgroup %s",
-            workgroup_name,
-        )
-
         # Step 1: Assume the IAM role
         credentials = cls.get_iam_credentials(
             role_arn=role_arn,
@@ -498,13 +595,33 @@ class AWSIAMAuthMixin:
             session_duration=session_duration,
         )
 
-        # Step 2: Get Redshift Serverless credentials
-        db_user, db_password = cls.generate_redshift_credentials(
-            credentials=credentials,
-            workgroup_name=workgroup_name,
-            db_name=db_name,
-            region=region,
-        )
+        # Step 2: Get Redshift credentials based on deployment type
+        if is_serverless:
+            assert workgroup_name is not None
+            logger.debug(
+                "Applying Redshift Serverless IAM authentication for workgroup 
%s",
+                workgroup_name,
+            )
+            db_user, db_password = cls.generate_redshift_credentials(
+                credentials=credentials,
+                workgroup_name=workgroup_name,
+                db_name=db_name,
+                region=region,
+            )
+        else:
+            assert cluster_identifier is not None
+            assert db_username is not None
+            logger.debug(
+                "Applying Redshift provisioned cluster IAM authentication for 
%s",
+                cluster_identifier,
+            )
+            db_user, db_password = cls.generate_redshift_cluster_credentials(
+                credentials=credentials,
+                cluster_identifier=cluster_identifier,
+                db_user=db_username,
+                db_name=db_name,
+                region=region,
+            )
 
         # Step 3: Update connection parameters
         connect_args = params.setdefault("connect_args", {})
diff --git a/tests/unit_tests/db_engine_specs/test_aws_iam.py 
b/tests/unit_tests/db_engine_specs/test_aws_iam.py
index 607b5a0fbf1..7b316f58aa6 100644
--- a/tests/unit_tests/db_engine_specs/test_aws_iam.py
+++ b/tests/unit_tests/db_engine_specs/test_aws_iam.py
@@ -646,7 +646,7 @@ def test_generate_redshift_credentials_client_error() -> 
None:
                 region="us-east-1",
             )
 
-        assert "Failed to get Redshift credentials" in str(exc_info.value)
+        assert "Failed to get Redshift Serverless credentials" in 
str(exc_info.value)
 
 
 def test_apply_redshift_iam_authentication() -> None:
@@ -755,3 +755,248 @@ def 
test_apply_redshift_iam_authentication_missing_db_name() -> None:
         )
 
     assert "db_name" in str(exc_info.value)
+
+
+def test_generate_redshift_cluster_credentials() -> None:
+    from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
+
+    credentials = {
+        "AccessKeyId": "ASIA...",
+        "SecretAccessKey": "secret...",
+        "SessionToken": "token...",
+    }
+
+    with patch("boto3.client") as mock_boto3_client:
+        mock_redshift = MagicMock()
+        mock_redshift.get_cluster_credentials.return_value = {
+            "DbUser": "IAM:superset_user",
+            "DbPassword": "redshift-cluster-temp-password",
+        }
+        mock_boto3_client.return_value = mock_redshift
+
+        db_user, db_password = 
AWSIAMAuthMixin.generate_redshift_cluster_credentials(
+            credentials=credentials,
+            cluster_identifier="my-redshift-cluster",
+            db_user="superset_user",
+            db_name="analytics",
+            region="us-east-1",
+        )
+
+        assert db_user == "IAM:superset_user"
+        assert db_password == "redshift-cluster-temp-password"  # noqa: S105
+        mock_boto3_client.assert_called_once_with(
+            "redshift",
+            region_name="us-east-1",
+            aws_access_key_id="ASIA...",
+            aws_secret_access_key="secret...",  # noqa: S106
+            aws_session_token="token...",  # noqa: S106
+        )
+        mock_redshift.get_cluster_credentials.assert_called_once_with(
+            ClusterIdentifier="my-redshift-cluster",
+            DbUser="superset_user",
+            DbName="analytics",
+            AutoCreate=False,
+        )
+
+
+def test_generate_redshift_cluster_credentials_with_auto_create() -> None:
+    from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
+
+    credentials = {
+        "AccessKeyId": "ASIA...",
+        "SecretAccessKey": "secret...",
+        "SessionToken": "token...",
+    }
+
+    with patch("boto3.client") as mock_boto3_client:
+        mock_redshift = MagicMock()
+        mock_redshift.get_cluster_credentials.return_value = {
+            "DbUser": "IAM:new_user",
+            "DbPassword": "temp-password",
+        }
+        mock_boto3_client.return_value = mock_redshift
+
+        AWSIAMAuthMixin.generate_redshift_cluster_credentials(
+            credentials=credentials,
+            cluster_identifier="my-cluster",
+            db_user="new_user",
+            db_name="dev",
+            region="us-west-2",
+            auto_create=True,
+        )
+
+        mock_redshift.get_cluster_credentials.assert_called_once_with(
+            ClusterIdentifier="my-cluster",
+            DbUser="new_user",
+            DbName="dev",
+            AutoCreate=True,
+        )
+
+
+def test_generate_redshift_cluster_credentials_client_error() -> None:
+    from botocore.exceptions import ClientError
+
+    from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
+
+    credentials = {
+        "AccessKeyId": "ASIA...",
+        "SecretAccessKey": "secret...",
+        "SessionToken": "token...",
+    }
+
+    with patch("boto3.client") as mock_boto3_client:
+        mock_redshift = MagicMock()
+        mock_redshift.get_cluster_credentials.side_effect = ClientError(
+            {"Error": {"Code": "ClusterNotFound", "Message": "Cluster not 
found"}},
+            "GetClusterCredentials",
+        )
+        mock_boto3_client.return_value = mock_redshift
+
+        with pytest.raises(SupersetSecurityException) as exc_info:
+            AWSIAMAuthMixin.generate_redshift_cluster_credentials(
+                credentials=credentials,
+                cluster_identifier="nonexistent-cluster",
+                db_user="superset_user",
+                db_name="dev",
+                region="us-east-1",
+            )
+
+        assert "Failed to get Redshift cluster credentials" in 
str(exc_info.value)
+
+
+def test_apply_redshift_iam_authentication_provisioned_cluster() -> None:
+    from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin, AWSIAMConfig
+
+    mock_database = MagicMock()
+    mock_database.sqlalchemy_uri_decrypted = (
+        "redshift+psycopg2://[email protected]"
+        ".redshift.amazonaws.com:5439/analytics"
+    )
+
+    iam_config: AWSIAMConfig = {
+        "enabled": True,
+        "role_arn": "arn:aws:iam::123456789012:role/RedshiftRole",
+        "region": "us-east-1",
+        "cluster_identifier": "my-cluster",
+        "db_username": "superset_user",
+        "db_name": "analytics",
+    }
+
+    params: dict[str, Any] = {}
+
+    with (
+        patch.object(
+            AWSIAMAuthMixin,
+            "get_iam_credentials",
+            return_value={
+                "AccessKeyId": "ASIA...",
+                "SecretAccessKey": "secret...",
+                "SessionToken": "token...",
+            },
+        ) as mock_get_creds,
+        patch.object(
+            AWSIAMAuthMixin,
+            "generate_redshift_cluster_credentials",
+            return_value=("IAM:superset_user", "cluster-temp-password"),
+        ) as mock_gen_creds,
+    ):
+        AWSIAMAuthMixin._apply_redshift_iam_authentication(
+            mock_database, params, iam_config
+        )
+
+    mock_get_creds.assert_called_once_with(
+        role_arn="arn:aws:iam::123456789012:role/RedshiftRole",
+        region="us-east-1",
+        external_id=None,
+        session_duration=3600,
+    )
+
+    mock_gen_creds.assert_called_once_with(
+        credentials={
+            "AccessKeyId": "ASIA...",
+            "SecretAccessKey": "secret...",
+            "SessionToken": "token...",
+        },
+        cluster_identifier="my-cluster",
+        db_user="superset_user",
+        db_name="analytics",
+        region="us-east-1",
+    )
+
+    assert params["connect_args"]["password"] == "cluster-temp-password"  # 
noqa: S105
+    assert params["connect_args"]["user"] == "IAM:superset_user"
+    assert params["connect_args"]["sslmode"] == "verify-ca"
+
+
+def test_apply_redshift_iam_authentication_provisioned_missing_db_username() 
-> None:
+    from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin, AWSIAMConfig
+
+    mock_database = MagicMock()
+    mock_database.sqlalchemy_uri_decrypted = 
"redshift+psycopg2://user@host:5439/dev"
+
+    iam_config: AWSIAMConfig = {
+        "enabled": True,
+        "role_arn": "arn:aws:iam::123456789012:role/RedshiftRole",
+        "region": "us-east-1",
+        "cluster_identifier": "my-cluster",
+        "db_name": "dev",
+        # Missing db_username - required for provisioned clusters
+    }
+
+    params: dict[str, Any] = {}
+
+    with pytest.raises(SupersetSecurityException) as exc_info:
+        AWSIAMAuthMixin._apply_redshift_iam_authentication(
+            mock_database, params, iam_config
+        )
+
+    assert "db_username" in str(exc_info.value)
+
+
+def test_apply_redshift_iam_authentication_both_workgroup_and_cluster() -> 
None:
+    from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin, AWSIAMConfig
+
+    mock_database = MagicMock()
+    mock_database.sqlalchemy_uri_decrypted = 
"redshift+psycopg2://user@host:5439/dev"
+
+    iam_config: AWSIAMConfig = {
+        "enabled": True,
+        "role_arn": "arn:aws:iam::123456789012:role/RedshiftRole",
+        "region": "us-east-1",
+        "workgroup_name": "my-workgroup",
+        "cluster_identifier": "my-cluster",
+        "db_name": "dev",
+    }
+
+    params: dict[str, Any] = {}
+
+    with pytest.raises(SupersetSecurityException) as exc_info:
+        AWSIAMAuthMixin._apply_redshift_iam_authentication(
+            mock_database, params, iam_config
+        )
+
+    assert "cannot have both" in str(exc_info.value)
+
+
+def test_apply_redshift_iam_authentication_neither_workgroup_nor_cluster() -> 
None:
+    from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin, AWSIAMConfig
+
+    mock_database = MagicMock()
+    mock_database.sqlalchemy_uri_decrypted = 
"redshift+psycopg2://user@host:5439/dev"
+
+    iam_config: AWSIAMConfig = {
+        "enabled": True,
+        "role_arn": "arn:aws:iam::123456789012:role/RedshiftRole",
+        "region": "us-east-1",
+        "db_name": "dev",
+        # Missing both workgroup_name and cluster_identifier
+    }
+
+    params: dict[str, Any] = {}
+
+    with pytest.raises(SupersetSecurityException) as exc_info:
+        AWSIAMAuthMixin._apply_redshift_iam_authentication(
+            mock_database, params, iam_config
+        )
+
+    assert "must include either workgroup_name" in str(exc_info.value)
diff --git a/tests/unit_tests/db_engine_specs/test_redshift_iam.py 
b/tests/unit_tests/db_engine_specs/test_redshift_iam.py
index 3f0da7e734e..9e17259f6d6 100644
--- a/tests/unit_tests/db_engine_specs/test_redshift_iam.py
+++ b/tests/unit_tests/db_engine_specs/test_redshift_iam.py
@@ -243,3 +243,140 @@ def test_redshift_mask_encrypted_extra() -> None:
     assert masked_config["aws_iam"]["region"] == "us-east-1"
     assert masked_config["aws_iam"]["workgroup_name"] == "my-workgroup"
     assert masked_config["aws_iam"]["db_name"] == "dev"
+
+
+def test_redshift_update_params_with_iam_provisioned_cluster() -> None:
+    from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
+    from superset.db_engine_specs.redshift import RedshiftEngineSpec
+
+    database = MagicMock()
+    database.encrypted_extra = json.dumps(
+        {
+            "aws_iam": {
+                "enabled": True,
+                "role_arn": "arn:aws:iam::123456789012:role/RedshiftRole",
+                "region": "us-east-1",
+                "cluster_identifier": "my-redshift-cluster",
+                "db_username": "superset_user",
+                "db_name": "analytics",
+            }
+        }
+    )
+    database.sqlalchemy_uri_decrypted = (
+        "redshift+psycopg2://[email protected]"
+        ".redshift.amazonaws.com:5439/analytics"
+    )
+
+    params: dict[str, Any] = {}
+
+    with (
+        patch.object(
+            AWSIAMAuthMixin,
+            "get_iam_credentials",
+            return_value={
+                "AccessKeyId": "ASIA...",
+                "SecretAccessKey": "secret...",
+                "SessionToken": "token...",
+            },
+        ),
+        patch.object(
+            AWSIAMAuthMixin,
+            "generate_redshift_cluster_credentials",
+            return_value=("IAM:superset_user", "cluster-temp-password"),
+        ),
+    ):
+        RedshiftEngineSpec.update_params_from_encrypted_extra(database, params)
+
+    assert "connect_args" in params
+    assert params["connect_args"]["password"] == "cluster-temp-password"  # 
noqa: S105
+    assert params["connect_args"]["user"] == "IAM:superset_user"
+    assert params["connect_args"]["sslmode"] == "verify-ca"
+
+
+def test_redshift_update_params_provisioned_cluster_with_external_id() -> None:
+    from superset.db_engine_specs.aws_iam import AWSIAMAuthMixin
+    from superset.db_engine_specs.redshift import RedshiftEngineSpec
+
+    database = MagicMock()
+    database.encrypted_extra = json.dumps(
+        {
+            "aws_iam": {
+                "enabled": True,
+                "role_arn": 
"arn:aws:iam::222222222222:role/CrossAccountRedshift",
+                "external_id": "superset-prod-12345",
+                "region": "us-west-2",
+                "cluster_identifier": "prod-cluster",
+                "db_username": "analytics_user",
+                "db_name": "prod_db",
+                "session_duration": 1800,
+            }
+        }
+    )
+    database.sqlalchemy_uri_decrypted = (
+        "redshift+psycopg2://[email protected]"
+        ".redshift.amazonaws.com:5439/prod_db"
+    )
+
+    params: dict[str, Any] = {}
+
+    with (
+        patch.object(
+            AWSIAMAuthMixin,
+            "get_iam_credentials",
+            return_value={
+                "AccessKeyId": "ASIA...",
+                "SecretAccessKey": "secret...",
+                "SessionToken": "token...",
+            },
+        ) as mock_get_creds,
+        patch.object(
+            AWSIAMAuthMixin,
+            "generate_redshift_cluster_credentials",
+            return_value=("IAM:analytics_user", "cluster-temp-password"),
+        ),
+    ):
+        RedshiftEngineSpec.update_params_from_encrypted_extra(database, params)
+
+    mock_get_creds.assert_called_once_with(
+        role_arn="arn:aws:iam::222222222222:role/CrossAccountRedshift",
+        region="us-west-2",
+        external_id="superset-prod-12345",
+        session_duration=1800,
+    )
+
+
+def test_redshift_mask_encrypted_extra_provisioned_cluster() -> None:
+    from superset.db_engine_specs.redshift import RedshiftEngineSpec
+
+    encrypted_extra = json.dumps(
+        {
+            "aws_iam": {
+                "enabled": True,
+                "role_arn": "arn:aws:iam::123456789012:role/SecretRole",
+                "external_id": "secret-external-id-12345",
+                "region": "us-east-1",
+                "cluster_identifier": "my-cluster",
+                "db_username": "superset_user",
+                "db_name": "analytics",
+            }
+        }
+    )
+
+    masked = RedshiftEngineSpec.mask_encrypted_extra(encrypted_extra)
+    assert masked is not None
+
+    masked_config = json.loads(masked)
+
+    # role_arn and external_id should be masked
+    assert (
+        masked_config["aws_iam"]["role_arn"]
+        != "arn:aws:iam::123456789012:role/SecretRole"
+    )
+    assert masked_config["aws_iam"]["external_id"] != 
"secret-external-id-12345"
+
+    # Non-sensitive fields should remain unchanged
+    assert masked_config["aws_iam"]["enabled"] is True
+    assert masked_config["aws_iam"]["region"] == "us-east-1"
+    assert masked_config["aws_iam"]["cluster_identifier"] == "my-cluster"
+    assert masked_config["aws_iam"]["db_username"] == "superset_user"
+    assert masked_config["aws_iam"]["db_name"] == "analytics"

Reply via email to