turbaszek commented on a change in pull request #11227:
URL: https://github.com/apache/airflow/pull/11227#discussion_r499783147
##########
File path: airflow/providers/amazon/aws/transfers/redshift_to_s3.py
##########
@@ -104,29 +107,50 @@ def __init__( # pylint: disable=too-many-arguments
'HEADER',
]
- def execute(self, context):
- postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
- s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+ def _build_unload_query(self, credentials, select_query, s3_key,
unload_options):
+ if credentials.token and credentials.access_key.startswith("ASIA"):
+ self.log.debug("STS token found in credentials, including it in
the UNLOAD command")
+ # these credentials are obtained from AWS STS
+ # so the token must be included in the CREDENTIALS clause
+ #
https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html#copy-credentials
+ credentials_line = (
+
"aws_access_key_id={access_key};aws_secret_access_key={secret_key};token={token}".format(
+ access_key=credentials.access_key,
+ secret_key=credentials.secret_key,
+ token=credentials.token,
+ )
+ )
- credentials = s3_hook.get_credentials()
- unload_options = '\n\t\t\t'.join(self.unload_options)
- s3_key = '{}/{}_'.format(self.s3_key, self.table) if
self.table_as_file_name else self.s3_key
- select_query = "SELECT * FROM
{schema}.{table}".format(schema=self.schema, table=self.table)
- unload_query = """
+ else:
+ credentials_line =
"aws_access_key_id={access_key};aws_secret_access_key={secret_key}".format(
+ access_key=credentials.access_key,
secret_key=credentials.secret_key
+ )
+
+ return """
UNLOAD ('{select_query}')
TO 's3://{s3_bucket}/{s3_key}'
with credentials
-
'aws_access_key_id={access_key};aws_secret_access_key={secret_key}'
+ '{credentials_line}'
{unload_options};
""".format(
select_query=select_query,
s3_bucket=self.s3_bucket,
s3_key=s3_key,
- access_key=credentials.access_key,
- secret_key=credentials.secret_key,
+ credentials_line=credentials_line,
unload_options=unload_options,
)
+ def execute(self, context):
+ postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
+ s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+
+ credentials = s3_hook.get_credentials()
+ unload_options = '\n\t\t\t'.join(self.unload_options)
+ s3_key = '{}/{}_'.format(self.s3_key, self.table) if
self.table_as_file_name else self.s3_key
+ select_query = "SELECT * FROM
{schema}.{table}".format(schema=self.schema, table=self.table)
Review comment:
This looks good to me but how about using f-string in all those changes?
In this way the code will be more readable :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]