This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 24b74168afe26500abaf82fd388eccee18611c4e
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue May 19 13:21:44 2020 +0100

    Fix race in Celery tests by pre-creating result tables (#8909)
    
    We noticed our Celery tests failing sometimes with
    
    > (psycopg2.errors.UniqueViolation) duplicate key value violates unique
    > constraint "pg_type_typname_nsp_index"
    > DETAIL:  Key (typname, typnamespace)=(celery_tasksetmeta, 2200) already 
exists
    
    It appears this is a race condition in SQLAlchemy's "create_all()"
    function, where it first checks which tables exist, builds up a list of
    `CREATE TABLE` statements, then issues them. Thus if two celery worker
    processes start at the same time, they will find the the table doesn't
    yet exist, and both try to create it.
    
    This is _probably_ a bug in SQLA, but this should be an easy enough fix
    here, to just ensure that the table exists before launching any Celery 
tasks.
    
    (cherry picked from commit bae5cc2f5ca32e0f61c3b92008fbd484184448ef)
---
 tests/executors/test_celery_executor.py | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/tests/executors/test_celery_executor.py 
b/tests/executors/test_celery_executor.py
index 511c90d..80671cd 100644
--- a/tests/executors/test_celery_executor.py
+++ b/tests/executors/test_celery_executor.py
@@ -61,6 +61,17 @@ class TestCeleryExecutor(unittest.TestCase):
         patch_app = mock.patch('airflow.executors.celery_executor.app', 
test_app)
         patch_execute = 
mock.patch('airflow.executors.celery_executor.execute_command', test_execute)
 
+        backend = test_app.backend
+
+        if hasattr(backend, 'ResultSession'):
+            # Pre-create the database tables now, otherwise SQLA vis Celery 
has a
+            # race condition where it one of the subprocesses can die with 
"Table
+            # already exists" error, because SQLA checks for which tables 
exist,
+            # then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT
+            # EXISTS
+            session = backend.ResultSession()
+            session.close()
+
         with patch_app, patch_execute:
             try:
                 yield test_app
@@ -140,6 +151,7 @@ class TestCeleryExecutor(unittest.TestCase):
         self.assertEquals(1, len(executor.queued_tasks))
         self.assertEquals(executor.queued_tasks['key'], value_tuple)
 
+    @pytest.mark.backend("mysql", "postgres")
     def test_exception_propagation(self):
         with self._prepare_app() as app:
             @app.task

Reply via email to