Repository: incubator-airflow Updated Branches: refs/heads/master a15b7c5b7 -> 49826af10
[AIRFLOW-2300] Add S3 Select functionarity to S3ToHiveTransfer To improve efficiency and usability, this PR adds S3 Select functionarity to S3ToHiveTransfer. It also contains some minor fixes for documents and comments. Closes #3243 from sekikn/AIRFLOW-2300 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/49826af1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/49826af1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/49826af1 Branch: refs/heads/master Commit: 49826af108d2e245ca921944296f24cc73120461 Parents: a15b7c5 Author: Kengo Seki <sek...@apache.org> Authored: Mon Apr 23 08:57:23 2018 +0200 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Mon Apr 23 08:57:23 2018 +0200 ---------------------------------------------------------------------- airflow/hooks/S3_hook.py | 6 ++- airflow/operators/s3_to_hive_operator.py | 34 ++++++++++++++- tests/operators/s3_to_hive_operator.py | 61 ++++++++++++++++++++++++--- 3 files changed, 90 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/49826af1/airflow/hooks/S3_hook.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/S3_hook.py b/airflow/hooks/S3_hook.py index 7a4b8b0..edde6ea 100644 --- a/airflow/hooks/S3_hook.py +++ b/airflow/hooks/S3_hook.py @@ -194,9 +194,11 @@ class S3Hook(AwsHook): :param expression_type: S3 Select expression type :type expression_type: str :param input_serialization: S3 Select input data serialization format - :type input_serialization: str + :type input_serialization: dict :param output_serialization: S3 Select output data serialization format - :type output_serialization: str + :type output_serialization: dict + :return: retrieved subset of original data by S3 Select + :rtype: str .. seealso:: For more details about S3 Select parameters: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/49826af1/airflow/operators/s3_to_hive_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/s3_to_hive_operator.py b/airflow/operators/s3_to_hive_operator.py index 129dd92..e9a979d 100644 --- a/airflow/operators/s3_to_hive_operator.py +++ b/airflow/operators/s3_to_hive_operator.py @@ -85,6 +85,8 @@ class S3ToHiveTransfer(BaseOperator): :type input_compressed: bool :param tblproperties: TBLPROPERTIES of the hive table being created :type tblproperties: dict + :param select_expression: S3 Select expression + :type select_expression: str """ template_fields = ('s3_key', 'partition', 'hive_table') @@ -108,6 +110,7 @@ class S3ToHiveTransfer(BaseOperator): hive_cli_conn_id='hive_cli_default', input_compressed=False, tblproperties=None, + select_expression=None, *args, **kwargs): super(S3ToHiveTransfer, self).__init__(*args, **kwargs) self.s3_key = s3_key @@ -124,6 +127,7 @@ class S3ToHiveTransfer(BaseOperator): self.aws_conn_id = aws_conn_id self.input_compressed = input_compressed self.tblproperties = tblproperties + self.select_expression = select_expression if (self.check_headers and not (self.field_dict is not None and self.headers)): @@ -146,16 +150,42 @@ class S3ToHiveTransfer(BaseOperator): raise AirflowException( "The key {0} does not exists".format(self.s3_key)) s3_key_object = self.s3.get_key(self.s3_key) + root, file_ext = os.path.splitext(s3_key_object.key) + if (self.select_expression and self.input_compressed and + file_ext != '.gz'): + raise AirflowException("GZIP is the only compression " + + "format Amazon S3 Select supports") + with TemporaryDirectory(prefix='tmps32hive_') as tmp_dir,\ NamedTemporaryFile(mode="wb", dir=tmp_dir, suffix=file_ext) as f: self.log.info("Dumping S3 key {0} contents to local file {1}" .format(s3_key_object.key, f.name)) - s3_key_object.download_fileobj(f) + if self.select_expression: + option = {} + if self.headers: + option['FileHeaderInfo'] = 'USE' + if self.delimiter: + option['FieldDelimiter'] = self.delimiter + + input_serialization = {'CSV': option} + if self.input_compressed: + input_serialization['CompressionType'] = 'GZIP' + + content = self.s3.select_key( + bucket_name=s3_key_object.bucket_name, + key=s3_key_object.key, + expression=self.select_expression, + input_serialization=input_serialization + ) + f.write(content.encode("utf-8")) + else: + s3_key_object.download_fileobj(f) f.flush() - if not self.headers: + + if self.select_expression or not self.headers: self.log.info("Loading file %s into Hive", f.name) self.hive.load_file( f.name, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/49826af1/tests/operators/s3_to_hive_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/s3_to_hive_operator.py b/tests/operators/s3_to_hive_operator.py index cb8df85..482e7fe 100644 --- a/tests/operators/s3_to_hive_operator.py +++ b/tests/operators/s3_to_hive_operator.py @@ -227,7 +227,7 @@ class S3ToHiveTransferTest(unittest.TestCase): format(ext, ('with' if has_header else 'without')) ) - self.kwargs['input_compressed'] = (False if ext == '.txt' else True) + self.kwargs['input_compressed'] = ext != '.txt' self.kwargs['s3_key'] = 's3://bucket/' + self.s3_key + ext ip_fn = self._get_fn(ext, self.kwargs['headers']) op_fn = self._get_fn(ext, False) @@ -235,20 +235,67 @@ class S3ToHiveTransferTest(unittest.TestCase): # Upload the file into the Mocked S3 bucket conn.upload_file(ip_fn, 'bucket', self.s3_key + ext) - # file paramter to HiveCliHook.load_file is compared - # against expected file oputput + # file parameter to HiveCliHook.load_file is compared + # against expected file output mock_hiveclihook().load_file.side_effect = \ lambda *args, **kwargs: \ self.assertTrue( - self._check_file_equality(args[0], - op_fn, - ext - ), + self._check_file_equality(args[0], op_fn, ext), msg='{0} output file not as expected'.format(ext)) # Execute S3ToHiveTransfer s32hive = S3ToHiveTransfer(**self.kwargs) s32hive.execute(None) + @unittest.skipIf(mock is None, 'mock package not present') + @unittest.skipIf(mock_s3 is None, 'moto package not present') + @mock.patch('airflow.operators.s3_to_hive_operator.HiveCliHook') + @mock_s3 + def test_execute_with_select_expression(self, mock_hiveclihook): + conn = boto3.client('s3') + conn.create_bucket(Bucket='bucket') + + select_expression = "SELECT * FROM S3Object s" + bucket = 'bucket' + + # Only testing S3ToHiveTransfer calls S3Hook.select_key with + # the right parameters and its execute method succeeds here, + # since Moto doesn't support select_object_content as of 1.3.2. + for (ext, has_header) in product(['.txt', '.gz'], [True, False]): + input_compressed = ext != '.txt' + key = self.s3_key + ext + + self.kwargs['check_headers'] = False + self.kwargs['headers'] = has_header + self.kwargs['input_compressed'] = input_compressed + self.kwargs['select_expression'] = select_expression + self.kwargs['s3_key'] = 's3://{0}/{1}'.format(bucket, key) + + ip_fn = self._get_fn(ext, has_header) + + # Upload the file into the Mocked S3 bucket + conn.upload_file(ip_fn, bucket, key) + + input_serialization = { + 'CSV': {'FieldDelimiter': self.delimiter} + } + if input_compressed: + input_serialization['CompressionType'] = 'GZIP' + if has_header: + input_serialization['CSV']['FileHeaderInfo'] = 'USE' + + # Confirm that select_key was called with the right params + with mock.patch('airflow.hooks.S3_hook.S3Hook.select_key', + return_value="") as mock_select_key: + # Execute S3ToHiveTransfer + s32hive = S3ToHiveTransfer(**self.kwargs) + s32hive.execute(None) + + mock_select_key.assert_called_once_with( + bucket_name=bucket, key=key, + expression=select_expression, + input_serialization=input_serialization + ) + if __name__ == '__main__': unittest.main()