mik-laj commented on issue #8231: Dag bulk_sync_to_db dag_tag only remove not 
exists
URL: https://github.com/apache/airflow/pull/8231#issuecomment-613511771
 
 
   @zhongjiajie 
   Here is SQL statements with parameters for your changes. 
   ```
   root@eeefbb427a9c:/opt/airflow# pytest tests/models/test_dag.py -k 
test_bulk_sync_to_db -s
   
===========================================================================================
 test session starts 
============================================================================================
   platform linux -- Python 3.6.10, pytest-5.4.1, py-1.8.1, pluggy-0.13.1 -- 
/usr/local/bin/python
   cachedir: .pytest_cache
   rootdir: /opt/airflow, inifile: pytest.ini
   plugins: flaky-3.6.1, instafail-0.4.1.post0, requests-mock-1.7.0, 
celery-4.4.2, cov-2.8.1
   collected 63 items / 62 deselected / 1 selected
   
   tests/models/test_dag.py::TestDag::test_bulk_sync_to_db 
========================= AIRFLOW ==========================
   Home of the user: /root
   Airflow home /root/airflow
   Skipping initializing of the DB as it was initialized already.
   You can re-initialize the database by adding --with-db-init flag when 
running tests.
   DELETE FROM dag_tag | {}
   DELETE FROM dag | {}
   [2020-04-14 15:26:58,412] {dag.py:1511} INFO - Sync 4 DAGs
   SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, 
dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active 
AS dag_is_active, dag.last_scheduler_run AS dag_last_scheduler_run, 
dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, 
dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, 
dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS 
dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS 
dag_schedule_interval, dag_tag_1.name AS dag_tag_1_name, dag_tag_1.dag_id AS 
dag_tag_1_dag_id  FROM dag LEFT OUTER JOIN dag_tag AS dag_tag_1 ON dag.dag_id = 
dag_tag_1.dag_id  WHERE dag.dag_id IN (%(dag_id_1)s, %(dag_id_2)s, 
%(dag_id_3)s, %(dag_id_4)s) FOR UPDATE OF dag | {'dag_id_1': 'dag-bulk-sync-2', 
'dag_id_2': 'dag-bulk-sync-1', 'dag_id_3': 'dag-bulk-sync-0', 'dag_id_4': 
'dag-bulk-sync-3'}
   [2020-04-14 15:26:58,474] {dag.py:1532} INFO - Creating ORM DAG for 
dag-bulk-sync-2
   [2020-04-14 15:26:58,475] {dag.py:1532} INFO - Creating ORM DAG for 
dag-bulk-sync-1
   [2020-04-14 15:26:58,476] {dag.py:1532} INFO - Creating ORM DAG for 
dag-bulk-sync-0
   [2020-04-14 15:26:58,476] {dag.py:1532} INFO - Creating ORM DAG for 
dag-bulk-sync-3
   INSERT INTO dag (dag_id, root_dag_id, is_paused, is_subdag, is_active, 
last_scheduler_run, last_pickled, last_expired, scheduler_lock, pickle_id, 
fileloc, owners, description, default_view, schedule_interval) VALUES 
(%(dag_id)s, %(root_dag_id)s, %(is_paused)s, %(is_subdag)s, %(is_active)s, 
%(last_scheduler_run)s, %(last_pickled)s, %(last_expired)s, %(scheduler_lock)s, 
%(pickle_id)s, %(fileloc)s, %(owners)s, %(description)s, %(default_view)s, 
%(schedule_interval)s) | ({'dag_id': 'dag-bulk-sync-2', 'root_dag_id': None, 
'is_paused': False, 'is_subdag': False, 'is_active': True, 
'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 412818, 
tzinfo=<Timezone [UTC]>), 'last_pickled': None, 'last_expired': None, 
'scheduler_lock': None, 'pickle_id': None, 'fileloc': 
'/opt/airflow/tests/models/test_dag.py', 'owners': '', 'description': None, 
'default_view': 'tree', 'schedule_interval': '{"type": "timedelta", "attrs": 
{"days": 1, "seconds": 0, "microseconds": 0}}'}, {'dag_id': 'dag-bulk-sync-1', 
'root_dag_id': None, 'is_paused': False, 'is_subdag': False, 'is_active': True, 
'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 412818, 
tzinfo=<Timezone [UTC]>), 'last_pickled': None, 'last_expired': None, 
'scheduler_lock': None, 'pickle_id': None, 'fileloc': 
'/opt/airflow/tests/models/test_dag.py', 'owners': '', 'description': None, 
'default_view': 'tree', 'schedule_interval': '{"type": "timedelta", "attrs": 
{"days": 1, "seconds": 0, "microseconds": 0}}'}, {'dag_id': 'dag-bulk-sync-0', 
'root_dag_id': None, 'is_paused': False, 'is_subdag': False, 'is_active': True, 
'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 412818, 
tzinfo=<Timezone [UTC]>), 'last_pickled': None, 'last_expired': None, 
'scheduler_lock': None, 'pickle_id': None, 'fileloc': 
'/opt/airflow/tests/models/test_dag.py', 'owners': '', 'description': None, 
'default_view': 'tree', 'schedule_interval': '{"type": "timedelta", "attrs": 
{"days": 1, "seconds": 0, "microseconds": 0}}'}, {'dag_id': 'dag-bulk-sync-3', 
'root_dag_id': None, 'is_paused': False, 'is_subdag': False, 'is_active': True, 
'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 412818, 
tzinfo=<Timezone [UTC]>), 'last_pickled': None, 'last_expired': None, 
'scheduler_lock': None, 'pickle_id': None, 'fileloc': 
'/opt/airflow/tests/models/test_dag.py', 'owners': '', 'description': None, 
'default_view': 'tree', 'schedule_interval': '{"type": "timedelta", "attrs": 
{"days": 1, "seconds": 0, "microseconds": 0}}'})
   INSERT INTO dag_tag (name, dag_id) VALUES (%(name)s, %(dag_id)s) | ({'name': 
'test-dag', 'dag_id': 'dag-bulk-sync-0'}, {'name': 'test-dag', 'dag_id': 
'dag-bulk-sync-1'}, {'name': 'test-dag', 'dag_id': 'dag-bulk-sync-2'}, {'name': 
'test-dag', 'dag_id': 'dag-bulk-sync-3'})
   SELECT dag.dag_id AS dag_dag_id  FROM dag | {}
   SELECT dag_tag.dag_id AS dag_tag_dag_id, dag_tag.name AS dag_tag_name  FROM 
dag_tag | {}
   [2020-04-14 15:26:58,497] {dag.py:1511} INFO - Sync 4 DAGs
   SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, 
dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active 
AS dag_is_active, dag.last_scheduler_run AS dag_last_scheduler_run, 
dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, 
dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, 
dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS 
dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS 
dag_schedule_interval, dag_tag_1.name AS dag_tag_1_name, dag_tag_1.dag_id AS 
dag_tag_1_dag_id  FROM dag LEFT OUTER JOIN dag_tag AS dag_tag_1 ON dag.dag_id = 
dag_tag_1.dag_id  WHERE dag.dag_id IN (%(dag_id_1)s, %(dag_id_2)s, 
%(dag_id_3)s, %(dag_id_4)s) FOR UPDATE OF dag | {'dag_id_1': 'dag-bulk-sync-2', 
'dag_id_2': 'dag-bulk-sync-1', 'dag_id_3': 'dag-bulk-sync-0', 'dag_id_4': 
'dag-bulk-sync-3'}
   UPDATE dag SET last_scheduler_run=%(last_scheduler_run)s WHERE dag.dag_id = 
%(dag_dag_id)s | ({'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 
58, 497087, tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-0'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 497087, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-1'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 497087, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-2'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 497087, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-3'})
   [2020-04-14 15:26:58,511] {dag.py:1511} INFO - Sync 4 DAGs
   SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, 
dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active 
AS dag_is_active, dag.last_scheduler_run AS dag_last_scheduler_run, 
dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, 
dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, 
dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS 
dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS 
dag_schedule_interval, dag_tag_1.name AS dag_tag_1_name, dag_tag_1.dag_id AS 
dag_tag_1_dag_id  FROM dag LEFT OUTER JOIN dag_tag AS dag_tag_1 ON dag.dag_id = 
dag_tag_1.dag_id  WHERE dag.dag_id IN (%(dag_id_1)s, %(dag_id_2)s, 
%(dag_id_3)s, %(dag_id_4)s) FOR UPDATE OF dag | {'dag_id_1': 'dag-bulk-sync-2', 
'dag_id_2': 'dag-bulk-sync-1', 'dag_id_3': 'dag-bulk-sync-0', 'dag_id_4': 
'dag-bulk-sync-3'}
   UPDATE dag SET last_scheduler_run=%(last_scheduler_run)s WHERE dag.dag_id = 
%(dag_dag_id)s | ({'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 
58, 511952, tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-0'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 511952, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-1'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 511952, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-2'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 511952, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-3'})
   [2020-04-14 15:26:58,527] {dag.py:1511} INFO - Sync 4 DAGs
   SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, 
dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active 
AS dag_is_active, dag.last_scheduler_run AS dag_last_scheduler_run, 
dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, 
dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, 
dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS 
dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS 
dag_schedule_interval, dag_tag_1.name AS dag_tag_1_name, dag_tag_1.dag_id AS 
dag_tag_1_dag_id  FROM dag LEFT OUTER JOIN dag_tag AS dag_tag_1 ON dag.dag_id = 
dag_tag_1.dag_id  WHERE dag.dag_id IN (%(dag_id_1)s, %(dag_id_2)s, 
%(dag_id_3)s, %(dag_id_4)s) FOR UPDATE OF dag | {'dag_id_1': 'dag-bulk-sync-2', 
'dag_id_2': 'dag-bulk-sync-1', 'dag_id_3': 'dag-bulk-sync-0', 'dag_id_4': 
'dag-bulk-sync-3'}
   UPDATE dag SET last_scheduler_run=%(last_scheduler_run)s WHERE dag.dag_id = 
%(dag_dag_id)s | ({'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 
58, 527839, tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-0'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 527839, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-1'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 527839, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-2'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 527839, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-3'})
   INSERT INTO dag_tag (name, dag_id) VALUES (%(name)s, %(dag_id)s) | ({'name': 
'test-dag2', 'dag_id': 'dag-bulk-sync-0'}, {'name': 'test-dag2', 'dag_id': 
'dag-bulk-sync-1'}, {'name': 'test-dag2', 'dag_id': 'dag-bulk-sync-2'}, 
{'name': 'test-dag2', 'dag_id': 'dag-bulk-sync-3'})
   SELECT dag.dag_id AS dag_dag_id  FROM dag | {}
   SELECT dag_tag.dag_id AS dag_tag_dag_id, dag_tag.name AS dag_tag_name  FROM 
dag_tag | {}
   [2020-04-14 15:26:58,551] {dag.py:1511} INFO - Sync 4 DAGs
   SELECT dag.dag_id AS dag_dag_id, dag.root_dag_id AS dag_root_dag_id, 
dag.is_paused AS dag_is_paused, dag.is_subdag AS dag_is_subdag, dag.is_active 
AS dag_is_active, dag.last_scheduler_run AS dag_last_scheduler_run, 
dag.last_pickled AS dag_last_pickled, dag.last_expired AS dag_last_expired, 
dag.scheduler_lock AS dag_scheduler_lock, dag.pickle_id AS dag_pickle_id, 
dag.fileloc AS dag_fileloc, dag.owners AS dag_owners, dag.description AS 
dag_description, dag.default_view AS dag_default_view, dag.schedule_interval AS 
dag_schedule_interval, dag_tag_1.name AS dag_tag_1_name, dag_tag_1.dag_id AS 
dag_tag_1_dag_id  FROM dag LEFT OUTER JOIN dag_tag AS dag_tag_1 ON dag.dag_id = 
dag_tag_1.dag_id  WHERE dag.dag_id IN (%(dag_id_1)s, %(dag_id_2)s, 
%(dag_id_3)s, %(dag_id_4)s) FOR UPDATE OF dag | {'dag_id_1': 'dag-bulk-sync-2', 
'dag_id_2': 'dag-bulk-sync-1', 'dag_id_3': 'dag-bulk-sync-0', 'dag_id_4': 
'dag-bulk-sync-3'}
   UPDATE dag SET last_scheduler_run=%(last_scheduler_run)s WHERE dag.dag_id = 
%(dag_dag_id)s | ({'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 
58, 551565, tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-0'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 551565, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-1'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 551565, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-2'}, 
{'last_scheduler_run': datetime.datetime(2020, 4, 14, 15, 26, 58, 551565, 
tzinfo=<Timezone [UTC]>), 'dag_dag_id': 'dag-bulk-sync-3'})
   DELETE FROM dag_tag WHERE dag_tag.name = %(name)s AND dag_tag.dag_id = 
%(dag_id)s | ({'name': 'test-dag', 'dag_id': 'dag-bulk-sync-0'}, {'name': 
'test-dag', 'dag_id': 'dag-bulk-sync-1'}, {'name': 'test-dag', 'dag_id': 
'dag-bulk-sync-2'}, {'name': 'test-dag', 'dag_id': 'dag-bulk-sync-3'})
   SELECT dag.dag_id AS dag_dag_id  FROM dag | {}
   SELECT dag_tag.dag_id AS dag_tag_dag_id, dag_tag.name AS dag_tag_name  FROM 
dag_tag | {}
   ```
   I hope they will be helpful. I will try to finish the documentation for 
perf-kit today so that you too can easily experiment without copying the code.
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to