ivica-k commented on a change in pull request #11227:
URL: https://github.com/apache/airflow/pull/11227#discussion_r499802779
##########
File path: airflow/providers/amazon/aws/transfers/redshift_to_s3.py
##########
@@ -104,29 +107,50 @@ def __init__( # pylint: disable=too-many-arguments
'HEADER',
]
- def execute(self, context):
- postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
- s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+ def _build_unload_query(self, credentials, select_query, s3_key,
unload_options):
+ if credentials.token and credentials.access_key.startswith("ASIA"):
+ self.log.debug("STS token found in credentials, including it in
the UNLOAD command")
+ # these credentials are obtained from AWS STS
+ # so the token must be included in the CREDENTIALS clause
+ #
https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html#copy-credentials
+ credentials_line = (
+
"aws_access_key_id={access_key};aws_secret_access_key={secret_key};token={token}".format(
+ access_key=credentials.access_key,
+ secret_key=credentials.secret_key,
+ token=credentials.token,
+ )
+ )
- credentials = s3_hook.get_credentials()
- unload_options = '\n\t\t\t'.join(self.unload_options)
- s3_key = '{}/{}_'.format(self.s3_key, self.table) if
self.table_as_file_name else self.s3_key
- select_query = "SELECT * FROM
{schema}.{table}".format(schema=self.schema, table=self.table)
- unload_query = """
+ else:
+ credentials_line =
"aws_access_key_id={access_key};aws_secret_access_key={secret_key}".format(
+ access_key=credentials.access_key,
secret_key=credentials.secret_key
+ )
+
+ return """
UNLOAD ('{select_query}')
TO 's3://{s3_bucket}/{s3_key}'
with credentials
-
'aws_access_key_id={access_key};aws_secret_access_key={secret_key}'
+ '{credentials_line}'
{unload_options};
""".format(
select_query=select_query,
s3_bucket=self.s3_bucket,
s3_key=s3_key,
- access_key=credentials.access_key,
- secret_key=credentials.secret_key,
+ credentials_line=credentials_line,
unload_options=unload_options,
)
+ def execute(self, context):
+ postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
+ s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+
+ credentials = s3_hook.get_credentials()
+ unload_options = '\n\t\t\t'.join(self.unload_options)
+ s3_key = '{}/{}_'.format(self.s3_key, self.table) if
self.table_as_file_name else self.s3_key
+ select_query = "SELECT * FROM
{schema}.{table}".format(schema=self.schema, table=self.table)
Review comment:
I personally agree with that. The only reason why I did not use
f-strings is because I did not want to introduce too many changes in one go.
I'll push the change shortly, thanks!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]