ashb commented on a change in pull request #4207: [AIRFLOW-3367] Run celery
integration test with redis broker.
URL: https://github.com/apache/incubator-airflow/pull/4207#discussion_r238113365
##########
File path: tests/executors/test_celery_executor.py
##########
@@ -38,48 +40,80 @@
import celery.contrib.testing.tasks # noqa: F401
+def _prepare_test_bodies():
+ if 'CELERY_BROKER_URLS' in os.environ:
+ return [
+ (url, )
+ for url in os.environ['CELERY_BROKER_URLS'].split(',')
+ ]
+ return [(configuration.conf.get('celery', 'BROKER_URL'))]
+
+
class CeleryExecutorTest(unittest.TestCase):
+
+ @contextlib.contextmanager
+ def _prepare_app(self, broker_url=None, execute=None):
+ broker_url = broker_url or configuration.conf.get('celery',
'BROKER_URL')
+ execute = execute or celery_executor.execute_command.__wrapped__
+
+ test_config = dict(celery_executor.celery_configuration)
+ test_config.update({'broker_url': broker_url})
+ test_app = Celery(broker_url, config_source=test_config)
+ test_execute = test_app.task(execute)
+ patch_app = mock.patch('airflow.executors.celery_executor.app',
test_app)
+ patch_execute =
mock.patch('airflow.executors.celery_executor.execute_command', test_execute)
+
+ with patch_app, patch_execute:
+ try:
+ yield test_app
Review comment:
Ah so it does. I wasn't familiar with how parameterized works.
Looks good as it is
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services