Repository: incubator-airflow Updated Branches: refs/heads/master 857850509 -> 8ac87b2e2
[AIRFLOW-1207] Enable utils.helpers unit tests Closes #2300 from skudriashev/airflow-1207 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8ac87b2e Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8ac87b2e Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8ac87b2e Branch: refs/heads/master Commit: 8ac87b2e284ee710c54085bb418976ccc5f7ea9a Parents: 8578505 Author: Stanislav Kudriashev <stas.kudrias...@gmail.com> Authored: Wed May 17 20:52:18 2017 +0200 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Wed May 17 20:52:18 2017 +0200 ---------------------------------------------------------------------- tests/utils/__init__.py | 3 -- tests/utils/compression.py | 97 --------------------------------- tests/utils/dates.py | 45 ---------------- tests/utils/helpers.py | 84 ----------------------------- tests/utils/test_compression.py | 100 +++++++++++++++++++++++++++++++++++ tests/utils/test_dates.py | 46 ++++++++++++++++ tests/utils/test_helpers.py | 90 +++++++++++++++++++++++++++++++ 7 files changed, 236 insertions(+), 229 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/__init__.py ---------------------------------------------------------------------- diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py index 6b15998..9d7677a 100644 --- a/tests/utils/__init__.py +++ b/tests/utils/__init__.py @@ -11,6 +11,3 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from .compression import * -from .dates import * http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/compression.py ---------------------------------------------------------------------- diff --git a/tests/utils/compression.py b/tests/utils/compression.py deleted file mode 100644 index f8e0ebb..0000000 --- a/tests/utils/compression.py +++ /dev/null @@ -1,97 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from airflow.utils import compression -import unittest -from tempfile import NamedTemporaryFile, mkdtemp -import bz2 -import gzip -import shutil -import logging -import errno -import filecmp - - -class Compression(unittest.TestCase): - - def setUp(self): - self.fn = {} - try: - header = "Sno\tSome,Text \n".encode() - line1 = "1\tAirflow Test\n".encode() - line2 = "2\tCompressionUtil\n".encode() - self.tmp_dir = mkdtemp(prefix='test_utils_compression_') - # create sample txt, gz and bz2 files - with NamedTemporaryFile(mode='wb+', - dir=self.tmp_dir, - delete=False) as f_txt: - self._set_fn(f_txt.name, '.txt') - f_txt.writelines([header, line1, line2]) - fn_gz = self._get_fn('.txt') + ".gz" - with gzip.GzipFile(filename=fn_gz, - mode="wb") as f_gz: - self._set_fn(fn_gz, '.gz') - f_gz.writelines([header, line1, line2]) - fn_bz2 = self._get_fn('.txt') + '.bz2' - with bz2.BZ2File(filename=fn_bz2, - mode="wb") as f_bz2: - self._set_fn(fn_bz2, '.bz2') - f_bz2.writelines([header, line1, line2]) - # Base Exception so it catches Keyboard Interrupt - except BaseException as e: - logging.error(e) - self.tearDown() - - def tearDown(self): - try: - shutil.rmtree(self.tmp_dir) - except OSError as e: - # ENOENT - no such file or directory - if e.errno != errno.ENOENT: - raise e - - # Helper method to create a dictionary of file names and - # file extension - def _set_fn(self, fn, ext): - self.fn[ext] = fn - - # Helper method to fetch a file of a - # certain extension - def _get_fn(self, ext): - return self.fn[ext] - - def test_uncompress_file(self): - # Testing txt file type - self.assertRaisesRegexp(NotImplementedError, - "^Received .txt format. Only gz and bz2.*", - compression.uncompress_file, - **{'input_file_name': None, - 'file_extension': '.txt', - 'dest_dir': None - }) - # Testing gz file type - fn_txt = self._get_fn('.txt') - fn_gz = self._get_fn('.gz') - txt_gz = compression.uncompress_file(fn_gz, '.gz', self.tmp_dir) - self.assertTrue(filecmp.cmp(txt_gz, fn_txt, shallow=False), - msg="Uncompressed file doest match original") - # Testing bz2 file type - fn_bz2 = self._get_fn('.bz2') - txt_bz2 = compression.uncompress_file(fn_bz2, '.bz2', self.tmp_dir) - self.assertTrue(filecmp.cmp(txt_bz2, fn_txt, shallow=False), - msg="Uncompressed file doest match original") - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/dates.py ---------------------------------------------------------------------- diff --git a/tests/utils/dates.py b/tests/utils/dates.py deleted file mode 100644 index dc0c87e..0000000 --- a/tests/utils/dates.py +++ /dev/null @@ -1,45 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime, timedelta -import unittest - -from airflow.utils import dates - -class Dates(unittest.TestCase): - - def test_days_ago(self): - today = datetime.today() - today_midnight = datetime.fromordinal(today.date().toordinal()) - - self.assertTrue(dates.days_ago(0) == today_midnight) - - self.assertTrue( - dates.days_ago(100) == today_midnight + timedelta(days=-100)) - - self.assertTrue( - dates.days_ago(0, hour=3) == today_midnight + timedelta(hours=3)) - self.assertTrue( - dates.days_ago(0, minute=3) - == today_midnight + timedelta(minutes=3)) - self.assertTrue( - dates.days_ago(0, second=3) - == today_midnight + timedelta(seconds=3)) - self.assertTrue( - dates.days_ago(0, microsecond=3) - == today_midnight + timedelta(microseconds=3)) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/helpers.py ---------------------------------------------------------------------- diff --git a/tests/utils/helpers.py b/tests/utils/helpers.py deleted file mode 100644 index 3ef43f8..0000000 --- a/tests/utils/helpers.py +++ /dev/null @@ -1,84 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import logging -import multiprocessing -import unittest -import psutil -import signal -import time - -from airflow.utils import helpers - - -class TestHelpers(unittest.TestCase): - @staticmethod - def _ignores_sigterm(child_pid, setup_done): - def signal_handler(signum, frame): - pass - signal.signal(signal.SIGTERM, signal_handler) - child_pid.value = os.getpid() - setup_done.release() - while True: - time.sleep(1) - - @staticmethod - def _parent_of_ignores_sigterm(child_process_killed, child_pid, - process_done, setup_done): - child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, - args=[child_pid, setup_done]) - child.start() - if setup_done.acquire(timeout=1.0): - helpers.kill_process_tree(logging.getLogger(), os.getpid(), timeout=1.0) - # Process.is_alive doesnt work with SIGKILL - if not psutil.pid_exists(child_pid.value): - child_process_killed.value = 1 - process_done.release() - - def test_kill_process_tree(self): - """ Spin up a process that can't be killed by SIGTERM and make sure it gets killed anyway. """ - child_process_killed = multiprocessing.Value('i', 0) - process_done = multiprocessing.Semaphore(0) - child_pid = multiprocessing.Value('i', 0) - setup_done = multiprocessing.Semaphore(0) - args = [child_process_killed, child_pid, process_done, setup_done] - child = multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, args=args) - try: - child.start() - self.assertTrue(process_done.acquire(timeout=5.0)) - self.assertEqual(1, child_process_killed.value) - finally: - try: - os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt work here - except OSError: - pass - child.terminate() - - def test_kill_using_shell(self): - """ Test when no process exists. """ - child_pid = multiprocessing.Value('i', 0) - setup_done = multiprocessing.Semaphore(0) - args = [child_pid, setup_done] - child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, args=args) - child.start() - - self.assertTrue(setup_done.acquire(timeout=1.0)) - pid_to_kill = child_pid.value - self.assertTrue(helpers.kill_using_shell(logging.getLogger(), pid_to_kill, - signal=signal.SIGKILL)) - child.join() # remove orphan process - self.assertFalse(helpers.kill_using_shell(logging.getLogger(), pid_to_kill, - signal=signal.SIGKILL)) - http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/test_compression.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_compression.py b/tests/utils/test_compression.py new file mode 100644 index 0000000..2f14d18 --- /dev/null +++ b/tests/utils/test_compression.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import bz2 +import errno +import filecmp +import gzip +import logging +import shutil +import tempfile +import unittest + +from airflow.utils import compression + + +class Compression(unittest.TestCase): + + def setUp(self): + self.fn = {} + try: + header = "Sno\tSome,Text \n".encode() + line1 = "1\tAirflow Test\n".encode() + line2 = "2\tCompressionUtil\n".encode() + self.tmp_dir = tempfile.mkdtemp(prefix='test_utils_compression_') + # create sample txt, gz and bz2 files + with tempfile.NamedTemporaryFile(mode='wb+', + dir=self.tmp_dir, + delete=False) as f_txt: + self._set_fn(f_txt.name, '.txt') + f_txt.writelines([header, line1, line2]) + + fn_gz = self._get_fn('.txt') + ".gz" + with gzip.GzipFile(filename=fn_gz, + mode="wb") as f_gz: + self._set_fn(fn_gz, '.gz') + f_gz.writelines([header, line1, line2]) + + fn_bz2 = self._get_fn('.txt') + '.bz2' + with bz2.BZ2File(filename=fn_bz2, mode="wb") as f_bz2: + self._set_fn(fn_bz2, '.bz2') + f_bz2.writelines([header, line1, line2]) + + # Base Exception so it catches Keyboard Interrupt + except BaseException as e: + logging.error(e) + self.tearDown() + + def tearDown(self): + try: + shutil.rmtree(self.tmp_dir) + except OSError as e: + # ENOENT - no such file or directory + if e.errno != errno.ENOENT: + raise e + + # Helper method to create a dictionary of file names and + # file extension + def _set_fn(self, fn, ext): + self.fn[ext] = fn + + # Helper method to fetch a file of a + # certain extension + def _get_fn(self, ext): + return self.fn[ext] + + def test_uncompress_file(self): + # Testing txt file type + self.assertRaisesRegexp(NotImplementedError, + "^Received .txt format. Only gz and bz2.*", + compression.uncompress_file, + **{'input_file_name': None, + 'file_extension': '.txt', + 'dest_dir': None + }) + # Testing gz file type + fn_txt = self._get_fn('.txt') + fn_gz = self._get_fn('.gz') + txt_gz = compression.uncompress_file(fn_gz, '.gz', self.tmp_dir) + self.assertTrue(filecmp.cmp(txt_gz, fn_txt, shallow=False), + msg="Uncompressed file doest match original") + # Testing bz2 file type + fn_bz2 = self._get_fn('.bz2') + txt_bz2 = compression.uncompress_file(fn_bz2, '.bz2', self.tmp_dir) + self.assertTrue(filecmp.cmp(txt_bz2, fn_txt, shallow=False), + msg="Uncompressed file doest match original") + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/test_dates.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_dates.py b/tests/utils/test_dates.py new file mode 100644 index 0000000..56fae32 --- /dev/null +++ b/tests/utils/test_dates.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime, timedelta +import unittest + +from airflow.utils import dates + + +class Dates(unittest.TestCase): + + def test_days_ago(self): + today = datetime.today() + today_midnight = datetime.fromordinal(today.date().toordinal()) + + self.assertTrue(dates.days_ago(0) == today_midnight) + + self.assertTrue( + dates.days_ago(100) == today_midnight + timedelta(days=-100)) + + self.assertTrue( + dates.days_ago(0, hour=3) == today_midnight + timedelta(hours=3)) + self.assertTrue( + dates.days_ago(0, minute=3) + == today_midnight + timedelta(minutes=3)) + self.assertTrue( + dates.days_ago(0, second=3) + == today_midnight + timedelta(seconds=3)) + self.assertTrue( + dates.days_ago(0, microsecond=3) + == today_midnight + timedelta(microseconds=3)) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ac87b2e/tests/utils/test_helpers.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_helpers.py b/tests/utils/test_helpers.py new file mode 100644 index 0000000..61f88d6 --- /dev/null +++ b/tests/utils/test_helpers.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import multiprocessing +import os +import psutil +import signal +import time +import unittest + +from airflow.utils import helpers + + +class TestHelpers(unittest.TestCase): + + @staticmethod + def _ignores_sigterm(child_pid, setup_done): + def signal_handler(signum, frame): + pass + + signal.signal(signal.SIGTERM, signal_handler) + child_pid.value = os.getpid() + setup_done.release() + while True: + time.sleep(1) + + @staticmethod + def _parent_of_ignores_sigterm(child_process_killed, child_pid, + process_done, setup_done): + child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, + args=[child_pid, setup_done]) + child.start() + if setup_done.acquire(timeout=1.0): + helpers.kill_process_tree(logging.getLogger(), os.getpid(), timeout=1.0) + # Process.is_alive doesnt work with SIGKILL + if not psutil.pid_exists(child_pid.value): + child_process_killed.value = 1 + + process_done.release() + + def test_kill_process_tree(self): + """Spin up a process that can't be killed by SIGTERM and make sure it gets killed anyway.""" + child_process_killed = multiprocessing.Value('i', 0) + process_done = multiprocessing.Semaphore(0) + child_pid = multiprocessing.Value('i', 0) + setup_done = multiprocessing.Semaphore(0) + args = [child_process_killed, child_pid, process_done, setup_done] + child = multiprocessing.Process(target=TestHelpers._parent_of_ignores_sigterm, args=args) + try: + child.start() + self.assertTrue(process_done.acquire(timeout=5.0)) + self.assertEqual(1, child_process_killed.value) + finally: + try: + os.kill(child_pid.value, signal.SIGKILL) # terminate doesnt work here + except OSError: + pass + child.terminate() + + def test_kill_using_shell(self): + """Test when no process exists.""" + child_pid = multiprocessing.Value('i', 0) + setup_done = multiprocessing.Semaphore(0) + args = [child_pid, setup_done] + child = multiprocessing.Process(target=TestHelpers._ignores_sigterm, args=args) + child.start() + + self.assertTrue(setup_done.acquire(timeout=1.0)) + pid_to_kill = child_pid.value + self.assertTrue(helpers.kill_using_shell(logging.getLogger(), pid_to_kill, + signal=signal.SIGKILL)) + child.join() # remove orphan process + self.assertFalse(helpers.kill_using_shell(logging.getLogger(), pid_to_kill, + signal=signal.SIGKILL)) + + +if __name__ == '__main__': + unittest.main()