Repository: incubator-airflow
Updated Branches:
  refs/heads/master 857850509 -> 8ac87b2e2


[AIRFLOW-1207] Enable utils.helpers unit tests

Closes #2300 from skudriashev/airflow-1207


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8ac87b2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8ac87b2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8ac87b2e

Branch: refs/heads/master
Commit: 8ac87b2e284ee710c54085bb418976ccc5f7ea9a
Parents: 8578505
Author: Stanislav Kudriashev <stas.kudrias...@gmail.com>
Authored: Wed May 17 20:52:18 2017 +0200
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed May 17 20:52:18 2017 +0200

----------------------------------------------------------------------
 tests/utils/__init__.py         |   3 --
 tests/utils/compression.py      |  97 ---------------------------------
 tests/utils/dates.py            |  45 ----------------
 tests/utils/helpers.py          |  84 -----------------------------
 tests/utils/test_compression.py | 100 +++++++++++++++++++++++++++++++++++
 tests/utils/test_dates.py       |  46 ++++++++++++++++
 tests/utils/test_helpers.py     |  90 +++++++++++++++++++++++++++++++
 7 files changed, 236 insertions(+), 229 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/__init__.py
----------------------------------------------------------------------
diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py
index 6b15998..9d7677a 100644
--- a/tests/utils/__init__.py
+++ b/tests/utils/__init__.py
@@ -11,6 +11,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from .compression import *
-from .dates import *

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/compression.py
----------------------------------------------------------------------
diff --git a/tests/utils/compression.py b/tests/utils/compression.py
deleted file mode 100644
index f8e0ebb..0000000
--- a/tests/utils/compression.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from airflow.utils import compression
-import unittest
-from tempfile import NamedTemporaryFile, mkdtemp
-import bz2
-import gzip
-import shutil
-import logging
-import errno
-import filecmp
-
-
-class Compression(unittest.TestCase):
-
-    def setUp(self):
-        self.fn = {}
-        try:
-            header = "Sno\tSome,Text \n".encode()
-            line1 = "1\tAirflow Test\n".encode()
-            line2 = "2\tCompressionUtil\n".encode()
-            self.tmp_dir = mkdtemp(prefix='test_utils_compression_')
-            # create sample txt, gz and bz2 files
-            with NamedTemporaryFile(mode='wb+',
-                                    dir=self.tmp_dir,
-                                    delete=False) as f_txt:
-                self._set_fn(f_txt.name, '.txt')
-                f_txt.writelines([header, line1, line2])
-            fn_gz = self._get_fn('.txt') + ".gz"
-            with gzip.GzipFile(filename=fn_gz,
-                               mode="wb") as f_gz:
-                self._set_fn(fn_gz, '.gz')
-                f_gz.writelines([header, line1, line2])
-            fn_bz2 = self._get_fn('.txt') + '.bz2'
-            with bz2.BZ2File(filename=fn_bz2,
-                             mode="wb") as f_bz2:
-                self._set_fn(fn_bz2, '.bz2')
-                f_bz2.writelines([header, line1, line2])
-        # Base Exception so it catches Keyboard Interrupt
-        except BaseException as e:
-            logging.error(e)
-            self.tearDown()
-
-    def tearDown(self):
-        try:
-            shutil.rmtree(self.tmp_dir)
-        except OSError as e:
-            # ENOENT - no such file or directory
-            if e.errno != errno.ENOENT:
-                raise e
-
-    # Helper method to create a dictionary of file names and
-    # file extension
-    def _set_fn(self, fn, ext):
-        self.fn[ext] = fn
-
-    # Helper method to fetch a file of a
-    # certain extension
-    def _get_fn(self, ext):
-        return self.fn[ext]
-
-    def test_uncompress_file(self):
-        # Testing txt file type
-        self.assertRaisesRegexp(NotImplementedError,
-                                "^Received .txt format. Only gz and bz2.*",
-                                compression.uncompress_file,
-                                **{'input_file_name': None,
-                                   'file_extension': '.txt',
-                                   'dest_dir': None
-                                   })
-        # Testing gz file type
-        fn_txt = self._get_fn('.txt')
-        fn_gz = self._get_fn('.gz')
-        txt_gz = compression.uncompress_file(fn_gz, '.gz', self.tmp_dir)
-        self.assertTrue(filecmp.cmp(txt_gz, fn_txt, shallow=False),
-                        msg="Uncompressed file doest match original")
-        # Testing bz2 file type
-        fn_bz2 = self._get_fn('.bz2')
-        txt_bz2 = compression.uncompress_file(fn_bz2, '.bz2', self.tmp_dir)
-        self.assertTrue(filecmp.cmp(txt_bz2, fn_txt, shallow=False),
-                        msg="Uncompressed file doest match original")
-
-
-if __name__ == '__main__':
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/dates.py
----------------------------------------------------------------------
diff --git a/tests/utils/dates.py b/tests/utils/dates.py
deleted file mode 100644
index dc0c87e..0000000
--- a/tests/utils/dates.py
+++ /dev/null
@@ -1,45 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from datetime import datetime, timedelta
-import unittest
-
-from airflow.utils import dates
-
-class Dates(unittest.TestCase):
-
-    def test_days_ago(self):
-        today = datetime.today()
-        today_midnight = datetime.fromordinal(today.date().toordinal())
-
-        self.assertTrue(dates.days_ago(0) == today_midnight)
-
-        self.assertTrue(
-            dates.days_ago(100) == today_midnight + timedelta(days=-100))
-
-        self.assertTrue(
-            dates.days_ago(0, hour=3) == today_midnight + timedelta(hours=3))
-        self.assertTrue(
-            dates.days_ago(0, minute=3)
-            == today_midnight + timedelta(minutes=3))
-        self.assertTrue(
-            dates.days_ago(0, second=3)
-            == today_midnight + timedelta(seconds=3))
-        self.assertTrue(
-            dates.days_ago(0, microsecond=3)
-            == today_midnight + timedelta(microseconds=3))
-
-
-if __name__ == '__main__':
-    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/helpers.py
----------------------------------------------------------------------
diff --git a/tests/utils/helpers.py b/tests/utils/helpers.py
deleted file mode 100644
index 3ef43f8..0000000
--- a/tests/utils/helpers.py
+++ /dev/null
@@ -1,84 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import logging
-import multiprocessing
-import unittest
-import psutil
-import signal
-import time
-
-from airflow.utils import helpers
-
-
-class TestHelpers(unittest.TestCase):
-    @staticmethod
-    def _ignores_sigterm(child_pid, setup_done):
-        def signal_handler(signum, frame):
-            pass
-        signal.signal(signal.SIGTERM, signal_handler)
-        child_pid.value = os.getpid()
-        setup_done.release()
-        while True:
-            time.sleep(1)
-
-    @staticmethod
-    def _parent_of_ignores_sigterm(child_process_killed, child_pid,
-                                   process_done, setup_done):
-        child = multiprocessing.Process(target=TestHelpers._ignores_sigterm,
-                                        args=[child_pid, setup_done])
-        child.start()
-        if setup_done.acquire(timeout=1.0):
-            helpers.kill_process_tree(logging.getLogger(), os.getpid(), 
timeout=1.0)
-            # Process.is_alive doesnt work with SIGKILL
-            if not psutil.pid_exists(child_pid.value):
-                child_process_killed.value = 1
-        process_done.release()
-
-    def test_kill_process_tree(self):
-        """ Spin up a process that can't be killed by SIGTERM and make sure it 
gets killed anyway. """
-        child_process_killed = multiprocessing.Value('i', 0)
-        process_done = multiprocessing.Semaphore(0)
-        child_pid = multiprocessing.Value('i', 0)
-        setup_done = multiprocessing.Semaphore(0)
-        args = [child_process_killed, child_pid, process_done, setup_done]
-        child = 
multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, 
args=args)
-        try:
-            child.start()
-            self.assertTrue(process_done.acquire(timeout=5.0))
-            self.assertEqual(1, child_process_killed.value)
-        finally:
-            try:
-                os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt 
work here
-            except OSError:
-                pass
-            child.terminate()
-
-    def test_kill_using_shell(self):
-        """ Test when no process exists. """
-        child_pid = multiprocessing.Value('i', 0)
-        setup_done = multiprocessing.Semaphore(0)
-        args = [child_pid, setup_done]
-        child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, 
args=args)
-        child.start()
-
-        self.assertTrue(setup_done.acquire(timeout=1.0))
-        pid_to_kill = child_pid.value
-        self.assertTrue(helpers.kill_using_shell(logging.getLogger(), 
pid_to_kill,
-                                                 signal=signal.SIGKILL))
-        child.join() # remove orphan process
-        self.assertFalse(helpers.kill_using_shell(logging.getLogger(), 
pid_to_kill,
-                                                  signal=signal.SIGKILL))
-    

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/test_compression.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_compression.py b/tests/utils/test_compression.py
new file mode 100644
index 0000000..2f14d18
--- /dev/null
+++ b/tests/utils/test_compression.py
@@ -0,0 +1,100 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import bz2
+import errno
+import filecmp
+import gzip
+import logging
+import shutil
+import tempfile
+import unittest
+
+from airflow.utils import compression
+
+
+class Compression(unittest.TestCase):
+
+    def setUp(self):
+        self.fn = {}
+        try:
+            header = "Sno\tSome,Text \n".encode()
+            line1 = "1\tAirflow Test\n".encode()
+            line2 = "2\tCompressionUtil\n".encode()
+            self.tmp_dir = tempfile.mkdtemp(prefix='test_utils_compression_')
+            # create sample txt, gz and bz2 files
+            with tempfile.NamedTemporaryFile(mode='wb+',
+                                             dir=self.tmp_dir,
+                                             delete=False) as f_txt:
+                self._set_fn(f_txt.name, '.txt')
+                f_txt.writelines([header, line1, line2])
+
+            fn_gz = self._get_fn('.txt') + ".gz"
+            with gzip.GzipFile(filename=fn_gz,
+                               mode="wb") as f_gz:
+                self._set_fn(fn_gz, '.gz')
+                f_gz.writelines([header, line1, line2])
+
+            fn_bz2 = self._get_fn('.txt') + '.bz2'
+            with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2:
+                self._set_fn(fn_bz2, '.bz2')
+                f_bz2.writelines([header, line1, line2])
+
+        # Base Exception so it catches Keyboard Interrupt
+        except BaseException as e:
+            logging.error(e)
+            self.tearDown()
+
+    def tearDown(self):
+        try:
+            shutil.rmtree(self.tmp_dir)
+        except OSError as e:
+            # ENOENT - no such file or directory
+            if e.errno != errno.ENOENT:
+                raise e
+
+    # Helper method to create a dictionary of file names and
+    # file extension
+    def _set_fn(self, fn, ext):
+        self.fn[ext] = fn
+
+    # Helper method to fetch a file of a
+    # certain extension
+    def _get_fn(self, ext):
+        return self.fn[ext]
+
+    def test_uncompress_file(self):
+        # Testing txt file type
+        self.assertRaisesRegexp(NotImplementedError,
+                                "^Received .txt format. Only gz and bz2.*",
+                                compression.uncompress_file,
+                                **{'input_file_name': None,
+                                   'file_extension': '.txt',
+                                   'dest_dir': None
+                                   })
+        # Testing gz file type
+        fn_txt = self._get_fn('.txt')
+        fn_gz = self._get_fn('.gz')
+        txt_gz = compression.uncompress_file(fn_gz, '.gz', self.tmp_dir)
+        self.assertTrue(filecmp.cmp(txt_gz, fn_txt, shallow=False),
+                        msg="Uncompressed file doest match original")
+        # Testing bz2 file type
+        fn_bz2 = self._get_fn('.bz2')
+        txt_bz2 = compression.uncompress_file(fn_bz2, '.bz2', self.tmp_dir)
+        self.assertTrue(filecmp.cmp(txt_bz2, fn_txt, shallow=False),
+                        msg="Uncompressed file doest match original")
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/test_dates.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_dates.py b/tests/utils/test_dates.py
new file mode 100644
index 0000000..56fae32
--- /dev/null
+++ b/tests/utils/test_dates.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime, timedelta
+import unittest
+
+from airflow.utils import dates
+
+
+class Dates(unittest.TestCase):
+
+    def test_days_ago(self):
+        today = datetime.today()
+        today_midnight = datetime.fromordinal(today.date().toordinal())
+
+        self.assertTrue(dates.days_ago(0) == today_midnight)
+
+        self.assertTrue(
+            dates.days_ago(100) == today_midnight + timedelta(days=-100))
+
+        self.assertTrue(
+            dates.days_ago(0, hour=3) == today_midnight + timedelta(hours=3))
+        self.assertTrue(
+            dates.days_ago(0, minute=3)
+            == today_midnight + timedelta(minutes=3))
+        self.assertTrue(
+            dates.days_ago(0, second=3)
+            == today_midnight + timedelta(seconds=3))
+        self.assertTrue(
+            dates.days_ago(0, microsecond=3)
+            == today_midnight + timedelta(microseconds=3))
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/test_helpers.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py
new file mode 100644
index 0000000..61f88d6
--- /dev/null
+++ b/tests/utils/test_helpers.py
@@ -0,0 +1,90 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import multiprocessing
+import os
+import psutil
+import signal
+import time
+import unittest
+
+from airflow.utils import helpers
+
+
+class TestHelpers(unittest.TestCase):
+
+    @staticmethod
+    def _ignores_sigterm(child_pid, setup_done):
+        def signal_handler(signum, frame):
+            pass
+
+        signal.signal(signal.SIGTERM, signal_handler)
+        child_pid.value = os.getpid()
+        setup_done.release()
+        while True:
+            time.sleep(1)
+
+    @staticmethod
+    def _parent_of_ignores_sigterm(child_process_killed, child_pid,
+                                   process_done, setup_done):
+        child = multiprocessing.Process(target=TestHelpers._ignores_sigterm,
+                                        args=[child_pid, setup_done])
+        child.start()
+        if setup_done.acquire(timeout=1.0):
+            helpers.kill_process_tree(logging.getLogger(), os.getpid(), 
timeout=1.0)
+            # Process.is_alive doesnt work with SIGKILL
+            if not psutil.pid_exists(child_pid.value):
+                child_process_killed.value = 1
+
+        process_done.release()
+
+    def test_kill_process_tree(self):
+        """Spin up a process that can't be killed by SIGTERM and make sure it 
gets killed anyway."""
+        child_process_killed = multiprocessing.Value('i', 0)
+        process_done = multiprocessing.Semaphore(0)
+        child_pid = multiprocessing.Value('i', 0)
+        setup_done = multiprocessing.Semaphore(0)
+        args = [child_process_killed, child_pid, process_done, setup_done]
+        child = 
multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, 
args=args)
+        try:
+            child.start()
+            self.assertTrue(process_done.acquire(timeout=5.0))
+            self.assertEqual(1, child_process_killed.value)
+        finally:
+            try:
+                os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt 
work here
+            except OSError:
+                pass
+            child.terminate()
+
+    def test_kill_using_shell(self):
+        """Test when no process exists."""
+        child_pid = multiprocessing.Value('i', 0)
+        setup_done = multiprocessing.Semaphore(0)
+        args = [child_pid, setup_done]
+        child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, 
args=args)
+        child.start()
+
+        self.assertTrue(setup_done.acquire(timeout=1.0))
+        pid_to_kill = child_pid.value
+        self.assertTrue(helpers.kill_using_shell(logging.getLogger(), 
pid_to_kill,
+                                                 signal=signal.SIGKILL))
+        child.join() # remove orphan process
+        self.assertFalse(helpers.kill_using_shell(logging.getLogger(), 
pid_to_kill,
+                                                  signal=signal.SIGKILL))
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to