Fokko closed pull request #3665: [AIRFLOW-2825]Fix S3ToHiveTransfer bug due to
case
URL: https://github.com/apache/incubator-airflow/pull/3665
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/operators/s3_to_hive_operator.py
b/airflow/operators/s3_to_hive_operator.py
index 09eb8363c0..5faaf916b7 100644
--- a/airflow/operators/s3_to_hive_operator.py
+++ b/airflow/operators/s3_to_hive_operator.py
@@ -153,7 +153,7 @@ def execute(self, context):
root, file_ext = os.path.splitext(s3_key_object.key)
if (self.select_expression and self.input_compressed and
- file_ext != '.gz'):
+ file_ext.lower() != '.gz'):
raise AirflowException("GZIP is the only compression " +
"format Amazon S3 Select supports")
diff --git a/tests/operators/s3_to_hive_operator.py
b/tests/operators/s3_to_hive_operator.py
index 482e7fefc8..6ca6274a2c 100644
--- a/tests/operators/s3_to_hive_operator.py
+++ b/tests/operators/s3_to_hive_operator.py
@@ -89,6 +89,11 @@ def setUp(self):
mode="wb") as f_gz_h:
self._set_fn(fn_gz, '.gz', True)
f_gz_h.writelines([header, line1, line2])
+ fn_gz_upper = self._get_fn('.txt', True) + ".GZ"
+ with gzip.GzipFile(filename=fn_gz_upper,
+ mode="wb") as f_gz_upper_h:
+ self._set_fn(fn_gz_upper, '.GZ', True)
+ f_gz_upper_h.writelines([header, line1, line2])
fn_bz2 = self._get_fn('.txt', True) + '.bz2'
with bz2.BZ2File(filename=fn_bz2,
mode="wb") as f_bz2_h:
@@ -105,6 +110,11 @@ def setUp(self):
mode="wb") as f_gz_nh:
self._set_fn(fn_gz, '.gz', False)
f_gz_nh.writelines([line1, line2])
+ fn_gz_upper = self._get_fn('.txt', False) + ".GZ"
+ with gzip.GzipFile(filename=fn_gz_upper,
+ mode="wb") as f_gz_upper_nh:
+ self._set_fn(fn_gz_upper, '.GZ', False)
+ f_gz_upper_nh.writelines([line1, line2])
fn_bz2 = self._get_fn('.txt', False) + '.bz2'
with bz2.BZ2File(filename=fn_bz2,
mode="wb") as f_bz2_nh:
@@ -143,7 +153,7 @@ def _check_file_equality(self, fn_1, fn_2, ext):
# gz files contain mtime and filename in the header that
# causes filecmp to return False even if contents are identical
# Hence decompress to test for equality
- if(ext == '.gz'):
+ if(ext.lower() == '.gz'):
with gzip.GzipFile(fn_1, 'rb') as f_1,\
NamedTemporaryFile(mode='wb') as f_txt_1,\
gzip.GzipFile(fn_2, 'rb') as f_2,\
@@ -220,14 +230,14 @@ def test_execute(self, mock_hiveclihook):
conn.create_bucket(Bucket='bucket')
# Testing txt, zip, bz2 files with and without header row
- for (ext, has_header) in product(['.txt', '.gz', '.bz2'], [True,
False]):
+ for (ext, has_header) in product(['.txt', '.gz', '.bz2', '.GZ'],
[True, False]):
self.kwargs['headers'] = has_header
self.kwargs['check_headers'] = has_header
logging.info("Testing {0} format {1} header".
format(ext,
('with' if has_header else 'without'))
)
- self.kwargs['input_compressed'] = ext != '.txt'
+ self.kwargs['input_compressed'] = ext.lower() != '.txt'
self.kwargs['s3_key'] = 's3://bucket/' + self.s3_key + ext
ip_fn = self._get_fn(ext, self.kwargs['headers'])
op_fn = self._get_fn(ext, False)
@@ -260,8 +270,8 @@ def test_execute_with_select_expression(self,
mock_hiveclihook):
# Only testing S3ToHiveTransfer calls S3Hook.select_key with
# the right parameters and its execute method succeeds here,
# since Moto doesn't support select_object_content as of 1.3.2.
- for (ext, has_header) in product(['.txt', '.gz'], [True, False]):
- input_compressed = ext != '.txt'
+ for (ext, has_header) in product(['.txt', '.gz', '.GZ'], [True,
False]):
+ input_compressed = ext.lower() != '.txt'
key = self.s3_key + ext
self.kwargs['check_headers'] = False
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services