Repository: incubator-airflow
Updated Branches:
  refs/heads/master 32c5f445e -> a14804310


[AIRFLOW-2254] Put header as first row in unload

Currently, data is ordered by first column in
descending order
Header row comes as first only if the first column
is integer
This fix puts header as first row regardless of
first column data type

Closes #3180 from sathyaprakashg/AIRFLOW-2254


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a1480431
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a1480431
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a1480431

Branch: refs/heads/master
Commit: a148043107f147ce7d3617308f119be27810ec5a
Parents: 32c5f44
Author: Sathyaprakash Govindasamy <[email protected]>
Authored: Mon Apr 16 10:21:22 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Mon Apr 16 10:21:22 2018 +0200

----------------------------------------------------------------------
 UPDATING.md                                     |  6 ++
 airflow/operators/redshift_to_s3_operator.py    | 77 +++++++++++++-------
 tests/operators/test_redshift_to_s3_operator.py | 43 +++++++----
 3 files changed, 85 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a1480431/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 0abccff..f50e598 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -58,6 +58,12 @@ Dataflow job labeling is now supported in 
Dataflow{Java,Python}Operator with a d
 "airflow-version" label, please upgrade your google-cloud-dataflow or 
apache-beam version
 to 2.2.0 or greater.
 
+### Redshift to S3 Operator
+With Airflow 1.9 or lower, Unload operation always included header row. In 
order to include header row, 
+we need to turn off parallel unload. It is preferred to perform unload 
operation using all nodes so that it is
+faster for larger tables. So, parameter called `include_header` is added and 
default is set to False. 
+Header row will be added only if this parameter is set True and also in that 
case parallel will be automatically turned off (`PARALLEL OFF`)  
+
 ### Google cloud connection string
 
 With Airflow 1.9 or lower there where two connection strings for the Google 
Cloud operators, both `google_cloud_storage_default` and 
`google_cloud_default`. This can be confusing and therefore the 
`google_cloud_storage_default` connection id has been replaced with 
`google_cloud_default` to make the connection id consistent across Airflow.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a1480431/airflow/operators/redshift_to_s3_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/redshift_to_s3_operator.py 
b/airflow/operators/redshift_to_s3_operator.py
index c3aa4dc..6c2998a 100644
--- a/airflow/operators/redshift_to_s3_operator.py
+++ b/airflow/operators/redshift_to_s3_operator.py
@@ -58,6 +58,7 @@ class RedshiftToS3Transfer(BaseOperator):
             unload_options=tuple(),
             autocommit=False,
             parameters=None,
+            include_header=False,
             *args, **kwargs):
         super(RedshiftToS3Transfer, self).__init__(*args, **kwargs)
         self.schema = schema
@@ -69,6 +70,11 @@ class RedshiftToS3Transfer(BaseOperator):
         self.unload_options = unload_options
         self.autocommit = autocommit
         self.parameters = parameters
+        self.include_header = include_header
+
+        if self.include_header and \
+           'PARALLEL OFF' not in [uo.upper().strip() for uo in unload_options]:
+            self.unload_options = list(unload_options) + ['PARALLEL OFF', ]
 
     def execute(self, context):
         self.hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)
@@ -76,35 +82,56 @@ class RedshiftToS3Transfer(BaseOperator):
         credentials = self.s3.get_credentials()
         unload_options = '\n\t\t\t'.join(self.unload_options)
 
-        self.log.info("Retrieving headers from %s.%s...", self.schema, 
self.table)
+        if self.include_header:
+            self.log.info("Retrieving headers from %s.%s...",
+                          self.schema, self.table)
+
+            columns_query = """SELECT column_name
+                                        FROM information_schema.columns
+                                        WHERE table_schema = '{schema}'
+                                        AND   table_name = '{table}'
+                                        ORDER BY ordinal_position
+                            """.format(schema=self.schema,
+                                       table=self.table)
 
-        columns_query = """SELECT column_name
-                            FROM information_schema.columns
-                            WHERE table_schema = '{0}'
-                            AND   table_name = '{1}'
-                            ORDER BY ordinal_position
-                        """.format(self.schema, self.table)
+            cursor = self.hook.get_conn().cursor()
+            cursor.execute(columns_query)
+            rows = cursor.fetchall()
+            columns = [row[0] for row in rows]
+            column_names = ', '.join("{0}".format(c) for c in columns)
+            column_headers = ', '.join("\\'{0}\\'".format(c) for c in columns)
+            column_castings = ', '.join("CAST({0} AS text) AS {0}".format(c)
+                                        for c in columns)
 
-        cursor = self.hook.get_conn().cursor()
-        cursor.execute(columns_query)
-        rows = cursor.fetchall()
-        columns = [row[0] for row in rows]
-        column_names = ', '.join("\\'{0}\\'".format(c) for c in columns)
-        column_castings = ', '.join("CAST({0} AS text) AS {0}".format(c)
-                                    for c in columns)
+            select_query = """SELECT {column_names} FROM
+                                    (SELECT 2 sort_order, {column_castings}
+                                     FROM {schema}.{table}
+                                    UNION ALL
+                                    SELECT 1 sort_order, {column_headers})
+                                 ORDER BY sort_order"""\
+                            .format(column_names=column_names,
+                                    column_castings=column_castings,
+                                    column_headers=column_headers,
+                                    schema=self.schema,
+                                    table=self.table)
+        else:
+            select_query = "SELECT * FROM {schema}.{table}"\
+                .format(schema=self.schema,
+                        table=self.table)
 
         unload_query = """
-                        UNLOAD ('SELECT {0}
-                        UNION ALL
-                        SELECT {1} FROM {2}.{3}
-                        ORDER BY 1 DESC')
-                        TO 's3://{4}/{5}/{3}_'
-                        with
-                        credentials 
'aws_access_key_id={6};aws_secret_access_key={7}'
-                        {8};
-                        """.format(column_names, column_castings, self.schema, 
self.table,
-                                   self.s3_bucket, self.s3_key, 
credentials.access_key,
-                                   credentials.secret_key, unload_options)
+                    UNLOAD ('{select_query}')
+                    TO 's3://{s3_bucket}/{s3_key}/{table}_'
+                    with credentials
+                    
'aws_access_key_id={access_key};aws_secret_access_key={secret_key}'
+                    {unload_options};
+                    """.format(select_query=select_query,
+                               table=self.table,
+                               s3_bucket=self.s3_bucket,
+                               s3_key=self.s3_key,
+                               access_key=credentials.access_key,
+                               secret_key=credentials.secret_key,
+                               unload_options=unload_options)
 
         self.log.info('Executing UNLOAD command...')
         self.hook.run(unload_query, self.autocommit)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a1480431/tests/operators/test_redshift_to_s3_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/test_redshift_to_s3_operator.py 
b/tests/operators/test_redshift_to_s3_operator.py
index 06db19c..e214f3d 100644
--- a/tests/operators/test_redshift_to_s3_operator.py
+++ b/tests/operators/test_redshift_to_s3_operator.py
@@ -45,7 +45,7 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
         table = "table"
         s3_bucket = "bucket"
         s3_key = "key"
-        unload_options = ""
+        unload_options = ('PARALLEL OFF',)
 
         t = RedshiftToS3Transfer(
             schema=schema,
@@ -53,33 +53,44 @@ class TestRedshiftToS3Transfer(unittest.TestCase):
             s3_bucket=s3_bucket,
             s3_key=s3_key,
             unload_options=unload_options,
+            include_header=True,
             redshift_conn_id="redshift_conn_id",
             aws_conn_id="aws_conn_id",
             task_id="task_id",
             dag=None)
         t.execute(None)
 
+        unload_options = '\n\t\t\t'.join(unload_options)
+
         columns_query = """
             SELECT column_name
             FROM information_schema.columns
-            WHERE table_schema = '{0}'
-            AND   table_name = '{1}'
+            WHERE table_schema = '{schema}'
+            AND   table_name = '{table}'
             ORDER BY ordinal_position
-            """.format(schema, table)
+            """.format(schema=schema,
+                       table=table)
 
         unload_query = """
-            UNLOAD ('SELECT \\'{0}\\'
-                     UNION ALL
-                     SELECT CAST({0} AS text) AS {0}
-                     FROM {1}.{2}
-                     ORDER BY 1 DESC')
-            TO 's3://{3}/{4}/{2}_'
-            with credentials
-            'aws_access_key_id={5};aws_secret_access_key={6}'
-            {7};
-            """.format(column_name, schema, table,
-                       s3_bucket, s3_key, access_key,
-                       secret_key, unload_options)
+                UNLOAD ('SELECT {column_name} FROM
+                            (SELECT 2 sort_order,
+                             CAST({column_name} AS text) AS {column_name}
+                            FROM {schema}.{table}
+                            UNION ALL
+                            SELECT 1 sort_order, \\'{column_name}\\')
+                         ORDER BY sort_order')
+                TO 's3://{s3_bucket}/{s3_key}/{table}_'
+                with credentials
+                
'aws_access_key_id={access_key};aws_secret_access_key={secret_key}'
+                {unload_options};
+                """.format(column_name=column_name,
+                           schema=schema,
+                           table=table,
+                           s3_bucket=s3_bucket,
+                           s3_key=s3_key,
+                           access_key=access_key,
+                           secret_key=secret_key,
+                           unload_options=unload_options)
 
         def _trim(s):
             return re.sub("\s+", " ", s.strip())

Reply via email to