Taragolis commented on code in PR #35897:
URL: https://github.com/apache/airflow/pull/35897#discussion_r1407207325
##########
airflow/providers/amazon/aws/hooks/redshift_sql.py:
##########
@@ -81,7 +81,9 @@ def _get_conn_params(self) -> dict[str, str | int]:
conn_params: dict[str, str | int] = {}
if conn.extra_dejson.get("iam", False):
- conn.login, conn.password, conn.port = self.get_iam_token(conn)
+ conn.login, conn.password, conn.port = self.get_iam_token(
+ conn, is_serverless=conn.extra_dejson.get("is_serverless",
False)
+ )
Review Comment:
Since we provide entire Connection object into the `get_iam_token` method, I
guess we do not need to use `is_serverless` argument and could resolve it into
the `get_iam_token`
##########
airflow/providers/amazon/aws/hooks/redshift_sql.py:
##########
@@ -96,28 +98,47 @@ def _get_conn_params(self) -> dict[str, str | int]:
return conn_params
- def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:
+ def get_iam_token(self, conn: Connection, is_serverless: bool = False) ->
tuple[str, str, int]:
"""Retrieve a temporary password to connect to Redshift.
Port is required. If none is provided, default is used for each
service.
"""
port = conn.port or 5439
- # Pull the custer-identifier from the beginning of the Redshift URL
- # ex. my-cluster.ccdre4hpd39h.us-east-1.redshift.amazonaws.com returns
my-cluster
- cluster_identifier = conn.extra_dejson.get("cluster_identifier")
- if not cluster_identifier:
- if conn.host:
- cluster_identifier = conn.host.split(".", 1)[0]
- else:
- raise AirflowException("Please set cluster_identifier or host
in redshift connection.")
- redshift_client = AwsBaseHook(aws_conn_id=self.aws_conn_id,
client_type="redshift").conn
- #
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.get_cluster_credentials
- cluster_creds = redshift_client.get_cluster_credentials(
- DbUser=conn.login,
- DbName=conn.schema,
- ClusterIdentifier=cluster_identifier,
- AutoCreate=False,
- )
+ if is_serverless:
+ serverless_work_group =
conn.extra_dejson.get("serverless_work_group")
+ if not serverless_work_group:
+ raise AirflowException(
+ "Please set serverless_work_group in redshift connection
to use IAM with Redshift Serverless."
+ )
+ serverless_token_duration_seconds = conn.extra_dejson.get(
+ "serverless_token_duration_seconds", 3600
+ )
Review Comment:
But it a general problem, how to not forget to document some parameters from
the abstract Connection Extra
##########
airflow/providers/amazon/aws/hooks/redshift_sql.py:
##########
@@ -96,28 +98,47 @@ def _get_conn_params(self) -> dict[str, str | int]:
return conn_params
- def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:
+ def get_iam_token(self, conn: Connection, is_serverless: bool = False) ->
tuple[str, str, int]:
"""Retrieve a temporary password to connect to Redshift.
Port is required. If none is provided, default is used for each
service.
"""
port = conn.port or 5439
- # Pull the custer-identifier from the beginning of the Redshift URL
- # ex. my-cluster.ccdre4hpd39h.us-east-1.redshift.amazonaws.com returns
my-cluster
- cluster_identifier = conn.extra_dejson.get("cluster_identifier")
- if not cluster_identifier:
- if conn.host:
- cluster_identifier = conn.host.split(".", 1)[0]
- else:
- raise AirflowException("Please set cluster_identifier or host
in redshift connection.")
- redshift_client = AwsBaseHook(aws_conn_id=self.aws_conn_id,
client_type="redshift").conn
- #
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.get_cluster_credentials
- cluster_creds = redshift_client.get_cluster_credentials(
- DbUser=conn.login,
- DbName=conn.schema,
- ClusterIdentifier=cluster_identifier,
- AutoCreate=False,
- )
+ if is_serverless:
+ serverless_work_group =
conn.extra_dejson.get("serverless_work_group")
+ if not serverless_work_group:
+ raise AirflowException(
+ "Please set serverless_work_group in redshift connection
to use IAM with Redshift Serverless."
+ )
+ serverless_token_duration_seconds = conn.extra_dejson.get(
+ "serverless_token_duration_seconds", 3600
+ )
Review Comment:
This parts also makes me sad, there is no info into the documentation about
this parameter, so end users might know about this settings only if they open
the source code 😿
##########
airflow/providers/amazon/aws/hooks/redshift_sql.py:
##########
@@ -96,28 +98,47 @@ def _get_conn_params(self) -> dict[str, str | int]:
return conn_params
- def get_iam_token(self, conn: Connection) -> tuple[str, str, int]:
+ def get_iam_token(self, conn: Connection, is_serverless: bool = False) ->
tuple[str, str, int]:
"""Retrieve a temporary password to connect to Redshift.
Port is required. If none is provided, default is used for each
service.
"""
port = conn.port or 5439
- # Pull the custer-identifier from the beginning of the Redshift URL
- # ex. my-cluster.ccdre4hpd39h.us-east-1.redshift.amazonaws.com returns
my-cluster
- cluster_identifier = conn.extra_dejson.get("cluster_identifier")
- if not cluster_identifier:
- if conn.host:
- cluster_identifier = conn.host.split(".", 1)[0]
- else:
- raise AirflowException("Please set cluster_identifier or host
in redshift connection.")
- redshift_client = AwsBaseHook(aws_conn_id=self.aws_conn_id,
client_type="redshift").conn
- #
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.get_cluster_credentials
- cluster_creds = redshift_client.get_cluster_credentials(
- DbUser=conn.login,
- DbName=conn.schema,
- ClusterIdentifier=cluster_identifier,
- AutoCreate=False,
- )
+ if is_serverless:
+ serverless_work_group =
conn.extra_dejson.get("serverless_work_group")
+ if not serverless_work_group:
+ raise AirflowException(
+ "Please set serverless_work_group in redshift connection
to use IAM with Redshift Serverless."
+ )
+ serverless_token_duration_seconds = conn.extra_dejson.get(
+ "serverless_token_duration_seconds", 3600
+ )
Review Comment:
This parts also makes me sad, there is no info into the documentation about
this parameter, so end users might know about this settings only if they open
the source code 😿
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]