feluelle commented on a change in pull request #6309: [AIRFLOW-3783][AIRFLOW-5395] Switch to HEADER arg to unload data from Redshift to S3 URL: https://github.com/apache/airflow/pull/6309#discussion_r334235150
########## File path: airflow/operators/redshift_to_s3_operator.py ########## @@ -85,52 +93,16 @@ def __init__( self.autocommit = autocommit self.include_header = include_header - if self.include_header and 'PARALLEL OFF' not in [uo.upper().strip() for uo in self.unload_options]: - self.unload_options = list(self.unload_options) + ['PARALLEL OFF', ] + if self.include_header and 'HEADER' not in [uo.upper().strip() for uo in self.unload_options]: + self.unload_options = list(self.unload_options) + ['HEADER', ] def execute(self, context): - self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) - self.s3 = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) - credentials = self.s3.get_credentials() - unload_options = '\n\t\t\t'.join(self.unload_options) - - if self.include_header: - self.log.info("Retrieving headers from %s.%s...", - self.schema, self.table) - - columns_query = """SELECT column_name - FROM information_schema.columns - WHERE table_schema = '{schema}' - AND table_name = '{table}' - ORDER BY ordinal_position - """.format(schema=self.schema, - table=self.table) - - cursor = self.hook.get_conn().cursor() - cursor.execute(columns_query) - rows = cursor.fetchall() - columns = [row[0] for row in rows] - column_names = ', '.join("{0}".format(c) for c in columns) - column_headers = ', '.join("\\'{0}\\'".format(c) for c in columns) - column_castings = ', '.join("CAST({0} AS text) AS {0}".format(c) - for c in columns) - - select_query = """SELECT {column_names} FROM - (SELECT 2 sort_order, {column_castings} - FROM {schema}.{table} - UNION ALL - SELECT 1 sort_order, {column_headers}) - ORDER BY sort_order"""\ - .format(column_names=column_names, - column_castings=column_castings, - column_headers=column_headers, - schema=self.schema, - table=self.table) - else: - select_query = "SELECT * FROM {schema}.{table}"\ - .format(schema=self.schema, - table=self.table) + postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) + s3_hook = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) + credentials = s3_hook.get_credentials() + unload_options = '\n\t\t\t'.join(self.unload_options) + select_query = "SELECT * FROM {schema}.{table}".format(schema=self.schema, table=self.table) Review comment: ..thinking about it, I think it's easier to specify `columns` and `order_by` in the `__init__`, because if we want to add `select_query` to it we need to differentiate between using `select_query` or `schema` & `table` - that's more complex. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services