Repository: incubator-airflow Updated Branches: refs/heads/master 32c5f445e -> a14804310
[AIRFLOW-2254] Put header as first row in unload Currently, data is ordered by first column in descending order Header row comes as first only if the first column is integer This fix puts header as first row regardless of first column data type Closes #3180 from sathyaprakashg/AIRFLOW-2254 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a1480431 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a1480431 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a1480431 Branch: refs/heads/master Commit: a148043107f147ce7d3617308f119be27810ec5a Parents: 32c5f44 Author: Sathyaprakash Govindasamy <[email protected]> Authored: Mon Apr 16 10:21:22 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Mon Apr 16 10:21:22 2018 +0200 ---------------------------------------------------------------------- UPDATING.md | 6 ++ airflow/operators/redshift_to_s3_operator.py | 77 +++++++++++++------- tests/operators/test_redshift_to_s3_operator.py | 43 +++++++---- 3 files changed, 85 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a1480431/UPDATING.md ---------------------------------------------------------------------- diff --git a/UPDATING.md b/UPDATING.md index 0abccff..f50e598 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -58,6 +58,12 @@ Dataflow job labeling is now supported in Dataflow{Java,Python}Operator with a d "airflow-version" label, please upgrade your google-cloud-dataflow or apache-beam version to 2.2.0 or greater. +### Redshift to S3 Operator +With Airflow 1.9 or lower, Unload operation always included header row. In order to include header row, +we need to turn off parallel unload. It is preferred to perform unload operation using all nodes so that it is +faster for larger tables. So, parameter called `include_header` is added and default is set to False. +Header row will be added only if this parameter is set True and also in that case parallel will be automatically turned off (`PARALLEL OFF`) + ### Google cloud connection string With Airflow 1.9 or lower there where two connection strings for the Google Cloud operators, both `google_cloud_storage_default` and `google_cloud_default`. This can be confusing and therefore the `google_cloud_storage_default` connection id has been replaced with `google_cloud_default` to make the connection id consistent across Airflow. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a1480431/airflow/operators/redshift_to_s3_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/redshift_to_s3_operator.py b/airflow/operators/redshift_to_s3_operator.py index c3aa4dc..6c2998a 100644 --- a/airflow/operators/redshift_to_s3_operator.py +++ b/airflow/operators/redshift_to_s3_operator.py @@ -58,6 +58,7 @@ class RedshiftToS3Transfer(BaseOperator): unload_options=tuple(), autocommit=False, parameters=None, + include_header=False, *args, **kwargs): super(RedshiftToS3Transfer, self).__init__(*args, **kwargs) self.schema = schema @@ -69,6 +70,11 @@ class RedshiftToS3Transfer(BaseOperator): self.unload_options = unload_options self.autocommit = autocommit self.parameters = parameters + self.include_header = include_header + + if self.include_header and \ + 'PARALLEL OFF' not in [uo.upper().strip() for uo in unload_options]: + self.unload_options = list(unload_options) + ['PARALLEL OFF', ] def execute(self, context): self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id) @@ -76,35 +82,56 @@ class RedshiftToS3Transfer(BaseOperator): credentials = self.s3.get_credentials() unload_options = '\n\t\t\t'.join(self.unload_options) - self.log.info("Retrieving headers from %s.%s...", self.schema, self.table) + if self.include_header: + self.log.info("Retrieving headers from %s.%s...", + self.schema, self.table) + + columns_query = """SELECT column_name + FROM information_schema.columns + WHERE table_schema = '{schema}' + AND table_name = '{table}' + ORDER BY ordinal_position + """.format(schema=self.schema, + table=self.table) - columns_query = """SELECT column_name - FROM information_schema.columns - WHERE table_schema = '{0}' - AND table_name = '{1}' - ORDER BY ordinal_position - """.format(self.schema, self.table) + cursor = self.hook.get_conn().cursor() + cursor.execute(columns_query) + rows = cursor.fetchall() + columns = [row[0] for row in rows] + column_names = ', '.join("{0}".format(c) for c in columns) + column_headers = ', '.join("\\'{0}\\'".format(c) for c in columns) + column_castings = ', '.join("CAST({0} AS text) AS {0}".format(c) + for c in columns) - cursor = self.hook.get_conn().cursor() - cursor.execute(columns_query) - rows = cursor.fetchall() - columns = [row[0] for row in rows] - column_names = ', '.join("\\'{0}\\'".format(c) for c in columns) - column_castings = ', '.join("CAST({0} AS text) AS {0}".format(c) - for c in columns) + select_query = """SELECT {column_names} FROM + (SELECT 2 sort_order, {column_castings} + FROM {schema}.{table} + UNION ALL + SELECT 1 sort_order, {column_headers}) + ORDER BY sort_order"""\ + .format(column_names=column_names, + column_castings=column_castings, + column_headers=column_headers, + schema=self.schema, + table=self.table) + else: + select_query = "SELECT * FROM {schema}.{table}"\ + .format(schema=self.schema, + table=self.table) unload_query = """ - UNLOAD ('SELECT {0} - UNION ALL - SELECT {1} FROM {2}.{3} - ORDER BY 1 DESC') - TO 's3://{4}/{5}/{3}_' - with - credentials 'aws_access_key_id={6};aws_secret_access_key={7}' - {8}; - """.format(column_names, column_castings, self.schema, self.table, - self.s3_bucket, self.s3_key, credentials.access_key, - credentials.secret_key, unload_options) + UNLOAD ('{select_query}') + TO 's3://{s3_bucket}/{s3_key}/{table}_' + with credentials + 'aws_access_key_id={access_key};aws_secret_access_key={secret_key}' + {unload_options}; + """.format(select_query=select_query, + table=self.table, + s3_bucket=self.s3_bucket, + s3_key=self.s3_key, + access_key=credentials.access_key, + secret_key=credentials.secret_key, + unload_options=unload_options) self.log.info('Executing UNLOAD command...') self.hook.run(unload_query, self.autocommit) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a1480431/tests/operators/test_redshift_to_s3_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/test_redshift_to_s3_operator.py b/tests/operators/test_redshift_to_s3_operator.py index 06db19c..e214f3d 100644 --- a/tests/operators/test_redshift_to_s3_operator.py +++ b/tests/operators/test_redshift_to_s3_operator.py @@ -45,7 +45,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase): table = "table" s3_bucket = "bucket" s3_key = "key" - unload_options = "" + unload_options = ('PARALLEL OFF',) t = RedshiftToS3Transfer( schema=schema, @@ -53,33 +53,44 @@ class TestRedshiftToS3Transfer(unittest.TestCase): s3_bucket=s3_bucket, s3_key=s3_key, unload_options=unload_options, + include_header=True, redshift_conn_id="redshift_conn_id", aws_conn_id="aws_conn_id", task_id="task_id", dag=None) t.execute(None) + unload_options = '\n\t\t\t'.join(unload_options) + columns_query = """ SELECT column_name FROM information_schema.columns - WHERE table_schema = '{0}' - AND table_name = '{1}' + WHERE table_schema = '{schema}' + AND table_name = '{table}' ORDER BY ordinal_position - """.format(schema, table) + """.format(schema=schema, + table=table) unload_query = """ - UNLOAD ('SELECT \\'{0}\\' - UNION ALL - SELECT CAST({0} AS text) AS {0} - FROM {1}.{2} - ORDER BY 1 DESC') - TO 's3://{3}/{4}/{2}_' - with credentials - 'aws_access_key_id={5};aws_secret_access_key={6}' - {7}; - """.format(column_name, schema, table, - s3_bucket, s3_key, access_key, - secret_key, unload_options) + UNLOAD ('SELECT {column_name} FROM + (SELECT 2 sort_order, + CAST({column_name} AS text) AS {column_name} + FROM {schema}.{table} + UNION ALL + SELECT 1 sort_order, \\'{column_name}\\') + ORDER BY sort_order') + TO 's3://{s3_bucket}/{s3_key}/{table}_' + with credentials + 'aws_access_key_id={access_key};aws_secret_access_key={secret_key}' + {unload_options}; + """.format(column_name=column_name, + schema=schema, + table=table, + s3_bucket=s3_bucket, + s3_key=s3_key, + access_key=access_key, + secret_key=secret_key, + unload_options=unload_options) def _trim(s): return re.sub("\s+", " ", s.strip())
