This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 229f631f2dafa7a698e634a614c8fa4102f68b38 Author: Joshua Carp <[email protected]> AuthorDate: Mon Dec 3 04:19:25 2018 -0500 [AIRFLOW-3367] Run celery integration test with redis broker. (#4207) (cherry picked from commit 5710ef2615ad7a24c4b039f491e0fabd942978b3) --- tests/executors/test_celery_executor.py | 165 +++++++++++++++++++------------- 1 file changed, 100 insertions(+), 65 deletions(-) diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index f76e588..dcf9910 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -16,69 +16,103 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import contextlib +import os import sys import unittest from multiprocessing import Pool +# leave this it is used by the test worker +# noinspection PyUnresolvedReferences +import celery.contrib.testing.tasks # noqa: F401 pylint: disable=unused-import import mock - -from celery.contrib.testing.worker import start_worker import pytest +from celery import Celery from celery import states as celery_states +from celery.contrib.testing.worker import start_worker +from kombu.asynchronous import set_event_loop +from parameterized import parameterized +from airflow.configuration import conf from airflow.executors import celery_executor -from airflow.executors.celery_executor import (CeleryExecutor, celery_configuration, - send_task_to_executor, execute_command) -from airflow.executors.celery_executor import app from airflow.utils.state import State -# leave this it is used by the test worker -import celery.contrib.testing.tasks # noqa: F401 pylint: disable=ungrouped-imports + +def _prepare_test_bodies(): + if 'CELERY_BROKER_URLS' in os.environ: + return [ + (url, ) + for url in os.environ['CELERY_BROKER_URLS'].split(',') + ] + return [(conf.get('celery', 'BROKER_URL'))] -class CeleryExecutorTest(unittest.TestCase): +class TestCeleryExecutor(unittest.TestCase): + + @contextlib.contextmanager + def _prepare_app(self, broker_url=None, execute=None): + broker_url = broker_url or conf.get('celery', 'BROKER_URL') + execute = execute or celery_executor.execute_command.__wrapped__ + + test_config = dict(celery_executor.celery_configuration) + test_config.update({'broker_url': broker_url}) + test_app = Celery(broker_url, config_source=test_config) + test_execute = test_app.task(execute) + patch_app = mock.patch('airflow.executors.celery_executor.app', test_app) + patch_execute = mock.patch('airflow.executors.celery_executor.execute_command', test_execute) + + with patch_app, patch_execute: + try: + yield test_app + finally: + # Clear event loop to tear down each celery instance + set_event_loop(None) + + @parameterized.expand(_prepare_test_bodies()) @pytest.mark.integration("redis") @pytest.mark.integration("rabbitmq") @pytest.mark.backend("mysql", "postgres") - def test_celery_integration(self): - executor = CeleryExecutor() - executor.start() - with start_worker(app=app, logfile=sys.stdout, loglevel='debug'): - success_command = ['true', 'some_parameter'] - fail_command = ['false', 'some_parameter'] - - cached_celery_backend = execute_command.backend - task_tuples_to_send = [('success', 'fake_simple_ti', success_command, - celery_configuration['task_default_queue'], - execute_command), - ('fail', 'fake_simple_ti', fail_command, - celery_configuration['task_default_queue'], - execute_command)] - - chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send)) - num_processes = min(len(task_tuples_to_send), executor._sync_parallelism) - - send_pool = Pool(processes=num_processes) - key_and_async_results = send_pool.map( - send_task_to_executor, - task_tuples_to_send, - chunksize=chunksize) - - send_pool.close() - send_pool.join() - - for key, command, result in key_and_async_results: - # Only pops when enqueued successfully, otherwise keep it - # and expect scheduler loop to deal with it. - result.backend = cached_celery_backend - executor.running[key] = command - executor.tasks[key] = result - executor.last_state[key] = celery_states.PENDING - - executor.running['success'] = True - executor.running['fail'] = True - - executor.end(synchronous=True) + def test_celery_integration(self, broker_url): + with self._prepare_app(broker_url) as app: + executor = celery_executor.CeleryExecutor() + executor.start() + + with start_worker(app=app, logfile=sys.stdout, loglevel='debug'): + success_command = ['true', 'some_parameter'] + fail_command = ['false', 'some_parameter'] + + cached_celery_backend = celery_executor.execute_command.backend + task_tuples_to_send = [('success', 'fake_simple_ti', success_command, + celery_executor.celery_configuration['task_default_queue'], + celery_executor.execute_command), + ('fail', 'fake_simple_ti', fail_command, + celery_executor.celery_configuration['task_default_queue'], + celery_executor.execute_command)] + + chunksize = executor._num_tasks_per_send_process(len(task_tuples_to_send)) + num_processes = min(len(task_tuples_to_send), executor._sync_parallelism) + + send_pool = Pool(processes=num_processes) + key_and_async_results = send_pool.map( + celery_executor.send_task_to_executor, + task_tuples_to_send, + chunksize=chunksize) + + send_pool.close() + send_pool.join() + + for key, command, result in key_and_async_results: + # Only pops when enqueued successfully, otherwise keep it + # and expect scheduler loop to deal with it. + result.backend = cached_celery_backend + executor.running[key] = command + executor.tasks[key] = result + executor.last_state[key] = celery_states.PENDING + + executor.running['success'] = True + executor.running['fail'] = True + + executor.end(synchronous=True) self.assertTrue(executor.event_buffer['success'], State.SUCCESS) self.assertTrue(executor.event_buffer['fail'], State.FAILED) @@ -93,31 +127,32 @@ class CeleryExecutorTest(unittest.TestCase): @pytest.mark.integration("rabbitmq") @pytest.mark.backend("mysql", "postgres") def test_error_sending_task(self): - @app.task def fake_execute_command(): pass - # fake_execute_command takes no arguments while execute_command takes 1, - # which will cause TypeError when calling task.apply_async() - celery_executor.execute_command = fake_execute_command - executor = CeleryExecutor() - value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti' - executor.queued_tasks['key'] = value_tuple - executor.heartbeat() - self.assertEqual(1, len(executor.queued_tasks)) - self.assertEqual(executor.queued_tasks['key'], value_tuple) + with self._prepare_app(execute=fake_execute_command): + # fake_execute_command takes no arguments while execute_command takes 1, + # which will cause TypeError when calling task.apply_async() + executor = celery_executor.CeleryExecutor() + value_tuple = 'command', '_', 'queue', 'should_be_a_simple_ti' + executor.queued_tasks['key'] = value_tuple + executor.heartbeat() + self.assertEquals(1, len(executor.queued_tasks)) + self.assertEquals(executor.queued_tasks['key'], value_tuple) def test_exception_propagation(self): - @app.task - def fake_celery_task(): - return {} + with self._prepare_app() as app: + @app.task + def fake_celery_task(): + return {} + + mock_log = mock.MagicMock() + executor = celery_executor.CeleryExecutor() + executor._log = mock_log - mock_log = mock.MagicMock() - executor = CeleryExecutor() - executor._log = mock_log + executor.tasks = {'key': fake_celery_task()} + executor.sync() - executor.tasks = {'key': fake_celery_task()} - executor.sync() assert mock_log.error.call_count == 1 args, kwargs = mock_log.error.call_args_list[0] # Result of queuing is not a celery task but a dict,
