Taragolis commented on code in PR #35897:
URL: https://github.com/apache/airflow/pull/35897#discussion_r1407207325


##########
airflow/providers/amazon/aws/hooks/redshift_sql.py:
##########
@@ -81,7 +81,9 @@ def _get_conn_params(self) -> dict[str, str | int]:
         conn_params: dict[str, str | int] = {}
 
         if conn.extra_dejson.get("iam", False):
-            conn.login, conn.password, conn.port = self.get_iam_token(conn)
+            conn.login, conn.password, conn.port = self.get_iam_token(
+                conn, is_serverless=conn.extra_dejson.get("is_serverless", 
False)
+            )

Review Comment:
   Since we provide entire Connection object into the `get_iam_token` method, I 
guess we do not need to use `is_serverless` argument and could resolve it into 
the `get_iam_token`



##########
airflow/providers/amazon/aws/hooks/redshift_sql.py:
##########
@@ -96,28 +98,47 @@ def _get_conn_params(self) -> dict[str, str | int]:
 
         return conn_params
 
-    def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:
+    def get_iam_token(self, conn: Connection, is_serverless: bool = False) -> 
tuple[str, str, int]:
         """Retrieve a temporary password to connect to Redshift.
 
         Port is required. If none is provided, default is used for each 
service.
         """
         port = conn.port or 5439
-        # Pull the custer-identifier from the beginning of the Redshift URL
-        # ex. my-cluster.ccdre4hpd39h.us-east-1.redshift.amazonaws.com returns 
my-cluster
-        cluster_identifier = conn.extra_dejson.get("cluster_identifier")
-        if not cluster_identifier:
-            if conn.host:
-                cluster_identifier = conn.host.split(".", 1)[0]
-            else:
-                raise AirflowException("Please set cluster_identifier or host 
in redshift connection.")
-        redshift_client = AwsBaseHook(aws_conn_id=self.aws_conn_id, 
client_type="redshift").conn
-        # 
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.get_cluster_credentials
-        cluster_creds = redshift_client.get_cluster_credentials(
-            DbUser=conn.login,
-            DbName=conn.schema,
-            ClusterIdentifier=cluster_identifier,
-            AutoCreate=False,
-        )
+        if is_serverless:
+            serverless_work_group = 
conn.extra_dejson.get("serverless_work_group")
+            if not serverless_work_group:
+                raise AirflowException(
+                    "Please set serverless_work_group in redshift connection 
to use IAM with Redshift Serverless."
+                )
+            serverless_token_duration_seconds = conn.extra_dejson.get(
+                "serverless_token_duration_seconds", 3600
+            )

Review Comment:
   But it a general problem, how to not forget to document some parameters from 
the abstract Connection Extra



##########
airflow/providers/amazon/aws/hooks/redshift_sql.py:
##########
@@ -96,28 +98,47 @@ def _get_conn_params(self) -> dict[str, str | int]:
 
         return conn_params
 
-    def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:
+    def get_iam_token(self, conn: Connection, is_serverless: bool = False) -> 
tuple[str, str, int]:
         """Retrieve a temporary password to connect to Redshift.
 
         Port is required. If none is provided, default is used for each 
service.
         """
         port = conn.port or 5439
-        # Pull the custer-identifier from the beginning of the Redshift URL
-        # ex. my-cluster.ccdre4hpd39h.us-east-1.redshift.amazonaws.com returns 
my-cluster
-        cluster_identifier = conn.extra_dejson.get("cluster_identifier")
-        if not cluster_identifier:
-            if conn.host:
-                cluster_identifier = conn.host.split(".", 1)[0]
-            else:
-                raise AirflowException("Please set cluster_identifier or host 
in redshift connection.")
-        redshift_client = AwsBaseHook(aws_conn_id=self.aws_conn_id, 
client_type="redshift").conn
-        # 
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.get_cluster_credentials
-        cluster_creds = redshift_client.get_cluster_credentials(
-            DbUser=conn.login,
-            DbName=conn.schema,
-            ClusterIdentifier=cluster_identifier,
-            AutoCreate=False,
-        )
+        if is_serverless:
+            serverless_work_group = 
conn.extra_dejson.get("serverless_work_group")
+            if not serverless_work_group:
+                raise AirflowException(
+                    "Please set serverless_work_group in redshift connection 
to use IAM with Redshift Serverless."
+                )
+            serverless_token_duration_seconds = conn.extra_dejson.get(
+                "serverless_token_duration_seconds", 3600
+            )

Review Comment:
   This parts also makes me sad, there is no info into the documentation about 
this parameter, so end users might know about this settings only if they open 
the source code 😿 



##########
airflow/providers/amazon/aws/hooks/redshift_sql.py:
##########
@@ -96,28 +98,47 @@ def _get_conn_params(self) -> dict[str, str | int]:
 
         return conn_params
 
-    def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:
+    def get_iam_token(self, conn: Connection, is_serverless: bool = False) -> 
tuple[str, str, int]:
         """Retrieve a temporary password to connect to Redshift.
 
         Port is required. If none is provided, default is used for each 
service.
         """
         port = conn.port or 5439
-        # Pull the custer-identifier from the beginning of the Redshift URL
-        # ex. my-cluster.ccdre4hpd39h.us-east-1.redshift.amazonaws.com returns 
my-cluster
-        cluster_identifier = conn.extra_dejson.get("cluster_identifier")
-        if not cluster_identifier:
-            if conn.host:
-                cluster_identifier = conn.host.split(".", 1)[0]
-            else:
-                raise AirflowException("Please set cluster_identifier or host 
in redshift connection.")
-        redshift_client = AwsBaseHook(aws_conn_id=self.aws_conn_id, 
client_type="redshift").conn
-        # 
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.get_cluster_credentials
-        cluster_creds = redshift_client.get_cluster_credentials(
-            DbUser=conn.login,
-            DbName=conn.schema,
-            ClusterIdentifier=cluster_identifier,
-            AutoCreate=False,
-        )
+        if is_serverless:
+            serverless_work_group = 
conn.extra_dejson.get("serverless_work_group")
+            if not serverless_work_group:
+                raise AirflowException(
+                    "Please set serverless_work_group in redshift connection 
to use IAM with Redshift Serverless."
+                )
+            serverless_token_duration_seconds = conn.extra_dejson.get(
+                "serverless_token_duration_seconds", 3600
+            )

Review Comment:
   This parts also makes me sad, there is no info into the documentation about 
this parameter, so end users might know about this settings only if they open 
the source code 😿 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to