This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 51f9e835bf4 Move `uncompress_file` function from `airflow.utils` to 
Hive provider (#43526)
51f9e835bf4 is described below

commit 51f9e835bf4e40e418f9f45a510a639068b08fa2
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Oct 30 22:09:46 2024 +0000

    Move `uncompress_file` function from `airflow.utils` to Hive provider 
(#43526)
    
    It was only used in S3 To Hive module. One less thing in the `utils` 
directory.
---
 airflow/utils/compression.py                       | 40 ----------
 .../providers/apache/hive/transfers/s3_to_hive.py  | 19 ++++-
 .../tests/apache/hive/transfers/test_s3_to_hive.py | 20 ++++-
 tests/utils/test_compression.py                    | 87 ----------------------
 4 files changed, 36 insertions(+), 130 deletions(-)

diff --git a/airflow/utils/compression.py b/airflow/utils/compression.py
deleted file mode 100644
index 8f4946346d6..00000000000
--- a/airflow/utils/compression.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import bz2
-import gzip
-import shutil
-from tempfile import NamedTemporaryFile
-
-
-def uncompress_file(input_file_name, file_extension, dest_dir):
-    """Uncompress gz and bz2 files."""
-    if file_extension.lower() not in (".gz", ".bz2"):
-        raise NotImplementedError(
-            f"Received {file_extension} format. Only gz and bz2 files can 
currently be uncompressed."
-        )
-    if file_extension.lower() == ".gz":
-        fmodule = gzip.GzipFile
-    elif file_extension.lower() == ".bz2":
-        fmodule = bz2.BZ2File
-    with fmodule(input_file_name, mode="rb") as f_compressed, 
NamedTemporaryFile(
-        dir=dest_dir, mode="wb", delete=False
-    ) as f_uncompressed:
-        shutil.copyfileobj(f_compressed, f_uncompressed)
-    return f_uncompressed.name
diff --git 
a/providers/src/airflow/providers/apache/hive/transfers/s3_to_hive.py 
b/providers/src/airflow/providers/apache/hive/transfers/s3_to_hive.py
index 6285103d370..e56a244f71a 100644
--- a/providers/src/airflow/providers/apache/hive/transfers/s3_to_hive.py
+++ b/providers/src/airflow/providers/apache/hive/transfers/s3_to_hive.py
@@ -22,6 +22,7 @@ from __future__ import annotations
 import bz2
 import gzip
 import os
+import shutil
 import tempfile
 from collections.abc import Sequence
 from tempfile import NamedTemporaryFile, TemporaryDirectory
@@ -31,7 +32,6 @@ from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator
 from airflow.providers.amazon.aws.hooks.s3 import S3Hook
 from airflow.providers.apache.hive.hooks.hive import HiveCliHook
-from airflow.utils.compression import uncompress_file
 
 if TYPE_CHECKING:
     from airflow.utils.context import Context
@@ -277,3 +277,20 @@ class S3ToHiveOperator(BaseOperator):
             for line in f_in:
                 f_out.write(line)
         return fn_output
+
+
+def uncompress_file(input_file_name, file_extension, dest_dir):
+    """Uncompress gz and bz2 files."""
+    if file_extension.lower() not in (".gz", ".bz2"):
+        raise NotImplementedError(
+            f"Received {file_extension} format. Only gz and bz2 files can 
currently be uncompressed."
+        )
+    if file_extension.lower() == ".gz":
+        fmodule = gzip.GzipFile
+    elif file_extension.lower() == ".bz2":
+        fmodule = bz2.BZ2File
+    with fmodule(input_file_name, mode="rb") as f_compressed, 
NamedTemporaryFile(
+        dir=dest_dir, mode="wb", delete=False
+    ) as f_uncompressed:
+        shutil.copyfileobj(f_compressed, f_uncompressed)
+    return f_uncompressed.name
diff --git a/providers/tests/apache/hive/transfers/test_s3_to_hive.py 
b/providers/tests/apache/hive/transfers/test_s3_to_hive.py
index 5f738f4b54d..eea3dd39a90 100644
--- a/providers/tests/apache/hive/transfers/test_s3_to_hive.py
+++ b/providers/tests/apache/hive/transfers/test_s3_to_hive.py
@@ -30,7 +30,7 @@ from unittest import mock
 import pytest
 
 from airflow.exceptions import AirflowException
-from airflow.providers.apache.hive.transfers.s3_to_hive import S3ToHiveOperator
+from airflow.providers.apache.hive.transfers.s3_to_hive import 
S3ToHiveOperator, uncompress_file
 
 boto3 = pytest.importorskip("boto3")
 moto = pytest.importorskip("moto")
@@ -122,7 +122,7 @@ class TestS3ToHiveTransfer:
 
     # Helper method to fetch a file of a
     # certain format (file extension and header)
-    def _get_fn(self, ext, header):
+    def _get_fn(self, ext, header=None):
         key = self._get_key(ext, header)
         return self.file_names[key]
 
@@ -279,3 +279,19 @@ class TestS3ToHiveTransfer:
                     expression=select_expression,
                     input_serialization=input_serialization,
                 )
+
+    def test_uncompress_file(self):
+        # Testing txt file type
+        with pytest.raises(NotImplementedError, match="^Received .txt format. 
Only gz and bz2.*"):
+            uncompress_file(
+                **{"input_file_name": None, "file_extension": ".txt", 
"dest_dir": None},
+            )
+        # Testing gz file type
+        fn_txt = self._get_fn(".txt")
+        fn_gz = self._get_fn(".gz")
+        txt_gz = uncompress_file(fn_gz, ".gz", self.tmp_dir)
+        assert filecmp.cmp(txt_gz, fn_txt, shallow=False), "Uncompressed file 
does not match original"
+        # Testing bz2 file type
+        fn_bz2 = self._get_fn(".bz2")
+        txt_bz2 = uncompress_file(fn_bz2, ".bz2", self.tmp_dir)
+        assert filecmp.cmp(txt_bz2, fn_txt, shallow=False), "Uncompressed file 
does not match original"
diff --git a/tests/utils/test_compression.py b/tests/utils/test_compression.py
deleted file mode 100644
index 66d4806fd11..00000000000
--- a/tests/utils/test_compression.py
+++ /dev/null
@@ -1,87 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-from __future__ import annotations
-
-import bz2
-import errno
-import filecmp
-import gzip
-import shutil
-import tempfile
-
-import pytest
-
-from airflow.utils import compression
-
-
-class TestCompression:
-    @pytest.fixture(autouse=True)
-    def setup_attrs(self):
-        self.file_names = {}
-        header = b"Sno\tSome,Text \n"
-        line1 = b"1\tAirflow Test\n"
-        line2 = b"2\tCompressionUtil\n"
-        self.tmp_dir = tempfile.mkdtemp(prefix="test_utils_compression_")
-        # create sample txt, gz and bz2 files
-        with tempfile.NamedTemporaryFile(mode="wb+", dir=self.tmp_dir, 
delete=False) as f_txt:
-            self._set_fn(f_txt.name, ".txt")
-            f_txt.writelines([header, line1, line2])
-
-        fn_gz = self._get_fn(".txt") + ".gz"
-        with gzip.GzipFile(filename=fn_gz, mode="wb") as f_gz:
-            self._set_fn(fn_gz, ".gz")
-            f_gz.writelines([header, line1, line2])
-
-        fn_bz2 = self._get_fn(".txt") + ".bz2"
-        with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2:
-            self._set_fn(fn_bz2, ".bz2")
-            f_bz2.writelines([header, line1, line2])
-
-        yield
-
-        try:
-            shutil.rmtree(self.tmp_dir)
-        except OSError as e:
-            # ENOENT - no such file or directory
-            if e.errno != errno.ENOENT:
-                raise e
-
-    # Helper method to create a dictionary of file names and
-    # file extension
-    def _set_fn(self, fn, ext):
-        self.file_names[ext] = fn
-
-    # Helper method to fetch a file of a
-    # certain extension
-    def _get_fn(self, ext):
-        return self.file_names[ext]
-
-    def test_uncompress_file(self):
-        # Testing txt file type
-        with pytest.raises(NotImplementedError, match="^Received .txt format. 
Only gz and bz2.*"):
-            compression.uncompress_file(
-                **{"input_file_name": None, "file_extension": ".txt", 
"dest_dir": None},
-            )
-        # Testing gz file type
-        fn_txt = self._get_fn(".txt")
-        fn_gz = self._get_fn(".gz")
-        txt_gz = compression.uncompress_file(fn_gz, ".gz", self.tmp_dir)
-        assert filecmp.cmp(txt_gz, fn_txt, shallow=False), "Uncompressed file 
does not match original"
-        # Testing bz2 file type
-        fn_bz2 = self._get_fn(".bz2")
-        txt_bz2 = compression.uncompress_file(fn_bz2, ".bz2", self.tmp_dir)
-        assert filecmp.cmp(txt_bz2, fn_txt, shallow=False), "Uncompressed file 
does not match original"

Reply via email to